• Redis之分布式锁


    1. 什么是分布式锁

    我们直接在并发编程阶段学习过synchronized lock这些都是单机锁。什么是锁呢? 就是我们有一些互斥资源,不能并行执行,需要一个东西来保证是串行执行的   

    单机锁的弊端

     如图,有3个独立的订单服务,4个用户并发从客端访问,用户12访问订单服务1,由于单机锁,所以可以有一个用户能够进行支付操作,用户3 用户4分别进入订单服务2 订单服务3 也能够进行支付操作,因此我们想让4个用户在一个时间内只有一个用户能够进行支付操作,单机锁就无法做到

    所以就引入了分布式锁

    分布式锁是把锁放在第三方,而不是在单机服务中,用户1-4 获取锁都到第三方服务中去获取,这样就可以保证分布式服务下的支付操作时是串行执行的了 

    2. 自己设计一个锁,如何实现

    需求: 

    a.只能有一个线程能同时执行互斥的资源

    b.其它的线程执行的时候,有线程在执行的话,要么等待,要么报错

    需要实现这个需求的3个重要条件

    1. 有一个标记来标记是不是有线程在执行

    2. 这个标记必须可见 保证相互之间这个标记是能拿到最新的结果的  volatile

    3. 获取这个标记不能同时抢占成功 安全 (lock中是使用cas机制来保证的)

    3. Redis中怎么做分布式锁

    a. 标记 redis是k-v结构 key作为一个标记,这个key存在的话说明有了标记,key不存在,说明没有标记

    b.标记的可见性  单线程命令执行,  必须要set完 你才能get

    c. 保证原子性  setnx 单线执行 

    setnx其实等价于两个命令(exists 和 set),假如我不想setnx,那么流程就等价于:

    1. if(exists(order:111)==0) //没锁
    2. {
    3. set(order:111,1)
    4. }

     但是如果这样写,就会发现一个问题,这两个指令不是原子性的,就会导致同时进入判断中,得到没锁的场景

     因此我们应该思考的是怎么保证多个指令的原子性?

    3.1 Redis事务

    redis的开启提交回滚命令分别为multi、exec、discard

    但是事务的特点是

    1.命令是原子的 在执行事务中的指令时,服务器阻塞,不能执行其他指令
    2.不能拿到事务中间的指令的结果 来决定后面指令的逻辑。

    因此,不能够通过判断而决定后面的逻辑,事务不能够满足我们上述的需求,因而采用Lua脚本 

    3.2 Lua脚本

    lua语言是一个轻量级的脚本语言,能够完美的融入redis中。

    redis调用lua脚本 采用eval指令

    预填充键值 如果传入的键是1 返回第一个参数 否则返回第二个参数

    127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]
    end return ARGV[2] " 1 1 'HUIHUI' 'XIAOHUIHUI'
    "HUIHUI"
    127.0 . 0.1 : 6379 > eval "if KEYS[1]=='1' then return ARGV[1]
    end return ARGV[2] " 1 2 'HUIHUI' 'XIAOHUIHUI'
    "XIAOHUIHUI"

     既然可以传入键值对,我们能不能在lua脚本里面再去执行redis的指令,并且可以根据返回的结果继续执行后续的逻辑。

    判断一个key是不是存在,如果不存在,则调用set的redis指令,在lua脚本调用redis指令,用redis.call

    127.0 . 0.1 : 6379 > eval "local
    key=redis.call('exists',KEYS[1]) if key==0 then return
    redis.call('set',KEYS[1],ARGV[1]) end return 1" 1 name
    HUIHUI
    OK
    127.0 . 0.1 : 6379 > get name
    "HUIHUI"

    Lua脚本中的指令是原子的,在执行lua脚本期间,其它指令阻塞,必须等待lua脚本的指令执行完毕。 所以单个lua脚本的指令不宜有太多指令。

    lua脚本的使用场景

    1.需要原子性地执行多个命令
    2.需要中间值来组合后面的命令
    3.需要中间值来编排后面的命令

    3.3 Demo

    我们使用lua脚本来保证多个指令的原子性,实现我们的分布式锁。

    1. private static String script = "" +
    2. "local lockSet = redis.call('exists', KEYS[1])\n"
    3. + "if lockSet == 0 then\n" +
    4. "redis.call('set', KEYS[1], ARGV[2])\n" +
    5. //设置过期时间,防止死锁
    6. "redis.call('expire', KEYS[1], ARGV[2])\n" +
    7. "end\n" +
    8. "return lockSet\n";

     加锁方法

    1. public String pay(Long orderId) {
    2. try {
    3. Long lock = (Long)
    4. redisTemplate.execute(RedisScript.of(script, Long.class),
    5. Arrays.asList("pay:" + orderId), "1", 30);
    6. if (lock == 0) {
    7. //模拟支付业务代码执行10s
    8. Thread.sleep(10000);
    9. //处理完业务逻辑删除锁 异常了
    10. redisTemplate.delete("lock");
    11. }
    12. return lock == 0 ? "正常支付完毕" : "请稍等,已经有人在支付!!";
    13. } catch (Exception exception) {
    14. redisTemplate.delete("lock" + orderId);
    15. return "系统异常";
    16. }
    17. }

    对外接口

    1. @RequestMapping(value = "/pay",method = RequestMethod.GET)
    2. @ResponseBody
    3. public String pay(@RequestParam("id") Long id) throws InterruptedException {
    4. return productService.pay(id);
    5. }

     思考:

    • 做重入锁怎么做? 同一个线程能加锁加多次 (redis中的hash)
      • 互斥的key  (大key)
      • 知道线程信息  (field)
      • 保存重入次数 value  hincrby 线程安全的
    • 假如锁30s会过期,但是业务没有执行完,锁就会失效

    4. Redission锁原理源码分析

    4.1 Redission使用

    a. 导入redission客户端包

    1. <dependency>
    2. <groupId>org.redissongroupId>
    3. <artifactId>redissonartifactId>
    4. <version>3.5.4version>
    5. dependency>

    b.初始化RedissionClient,将RedissionClient交给Spring容器管理

    1. @Bean
    2. RedissonClientgetRedissonClient(){
    3. Configconfig=newConfig();
    4.    config.useClusterServers()    //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
    5. .addNodeAddress("redis://192.168.8.127:6381")
    6. .addNodeAddress("redis://192.168.8.128:6381")
    7. .addNodeAddress("redis://192.168.8.129:6381")
    8. .addNodeAddress("redis://192.168.8.127:6380")
    9. .addNodeAddress("redis://192.168.8.128:6380")
    10. .addNodeAddress("redis://192.168.8.129:6380");
    11. return Redisson.create(config);
    12. }

    c. 调用Redission的分布式锁

    1. public String payRedisson(Long orderId)throws InterruptedException{
    2. RLockr Lock1=redissonClient.getLock("order_lock"+orderId);
    3. if(rLock1.tryLock(-1,-1,TimeUnit.SECONDS)){
    4.      rLock1.tryLock(-1,-1,TimeUnit.SECONDS);
    5.      System.out.println("获取锁成功");
    6.      Thread.sleep(500000);
    7.      rLock1.unlock();
    8.      return"处理完成";
    9. } else {
    10. System.out.println("获取锁失败");    
    11. return"请稍等,已经有人在支付!!";
    12. }

     d. 暴露接口

    1. @RequestMapping(value="/pay_lock",method= RequestMethod.GET)
    2. @ResponseBody
    3. public String payRedisson(@RequestParam("id") Long id) throws InterruptedException{
    4. return product Service.payRedisson(id);
    5. }

     4.2 Reddission分布式流程图

     4.3 Reddison源码解析

    4.3.1 初始化锁对象

    RLockr Lock1=redissonClient.getLock("order_lock"+orderId);

    主要初始化3个方法: RedissonLock、RedissonBaseLock、RedissonObject 

    主要是一些任务执行器、大key,UUID以及过期时间、订阅服务的一些初始化

    4.3.2 加锁方法

    1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    2. long time = unit.toMillis(waitTime); // 锁等待时间
    3. long current = System.currentTimeMillis(); // 当前时间
    4. final long threadId = Thread.currentThread().getId(); // 当前线程
    5. Long ttl = this.tryAcquire(leaseTime, unit, threadId); // 抢占锁
    6. if (ttl == null) { // 抢占成功 执行业务逻辑
    7. return true;
    8. } else { // 抢占失败的逻辑处理
    9. time -= System.currentTimeMillis() - current; // 看当前时间是否已经过了锁等待时间
    10. if (time <= 0L) { // 如果过了 直接失败
    11. this.acquireFailed(threadId);
    12. return false;
    13. } else { // 如果没过锁等待时间
    14. current = System.currentTimeMillis();
    15. // 当前线程的订阅 也就是说在锁等待时间内 如果锁释放了 则就进行直接订阅
    16. final RFuture subscribeFuture = this.subscribe(threadId);
    17. if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
    18. // 订阅失败 获取锁失败
    19. if (!subscribeFuture.cancel(false)) {
    20. subscribeFuture.addListener(new FutureListener() {
    21. public void operationComplete(Future future) throws Exception {
    22. if (subscribeFuture.isSuccess()) {
    23. RedissonLock.this.unsubscribe(subscribeFuture, threadId);
    24. }
    25. }
    26. });
    27. }
    28. this.acquireFailed(threadId);
    29. return false;
    30. } else {
    31. // 表示订阅成功了 就是监听到了锁释放 开始进行抢占锁
    32. boolean var14;
    33. try {
    34. time -= System.currentTimeMillis() - current;
    35. if (time > 0L) {
    36. boolean var16;
    37. do {
    38. long currentTime = System.currentTimeMillis();
    39. ttl = this.tryAcquire(leaseTime, unit, threadId);
    40. if (ttl == null) { // 表示抢占成功
    41. var16 = true;
    42. return var16;
    43. }
    44. time -= System.currentTimeMillis() - currentTime;
    45. if (time <= 0L) { // 已过锁等待时间 抢占失败
    46. this.acquireFailed(threadId);
    47. var16 = false;
    48. return var16;
    49. }
    50. currentTime = System.currentTimeMillis();
    51. // 如果没抢到锁,并且也还没到锁的释放时间 使用Semaphore进行阻塞,阻塞时间 : 剩余等待时间ttl< 锁释放时间time? 剩余等待时间: 锁释放时间
    52. if (ttl >= 0L && ttl < time) {
    53. this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    54. } else {
    55. this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    56. }
    57. time -= System.currentTimeMillis() - currentTime;
    58. } while(time > 0L);
    59. this.acquireFailed(threadId);
    60. var16 = false;
    61. return var16;
    62. }
    63. this.acquireFailed(threadId);
    64. var14 = false;
    65. } finally {
    66. this.unsubscribe(subscribeFuture, threadId);
    67. }
    68. return var14;
    69. }
    70. }
    71. }
    72. }

    整个逻辑的实现就是我们4.2的左半部分 

    下面我们首先看一个Semaphore的demo 知道是怎么进行阻塞的

    1. Semaphoresemaphore=newSemaphore(0);
    2. semaphore.release(); //释放semaphore才能后面判断为true if(semaphore.tryAcquire(10,TimeUnit.SECONDS))
    3. {
    4. System.out.println("10s过后");
    5. }
    6. System.out.println("1");

     接着我画了一个上述代码的时间轴图 可帮助理解

     4.3.2.1 tryAcquire(可重入锁)

    tracquire就是重入锁抢占的一个实现,我们主要看一下lua脚本的实现

    其中KEYS[1]是我的大key: order_lock111 ARGV[1]=30000 ARGV[2]:UUID+线程ID

    1. if(redis.call('exists',KEYS[1])==0)  //判断我的锁是否存在 =0为不存在没人抢占锁
    2. then
    3. redis.call('hincrby',KEYS[1],ARGV[2],1); //把我的小key+1
    4. redis.call('pexpire',KEYS[1],ARGV[1]); //设置过期时间30s
    5. eturnnil;
    6. end;
    7. if(redis.call('hexists',KEYS[1],ARGV[2])==1//进入该逻辑说明有线程抢占了锁继续判断是否同一个线程==1为同一线程
    8. then 
    9. redis.call('hincrby', KEYS[1],ARGV[2],1); 把我的小key+1 代表重入次数
    10. redis.call('pexpire',KEYS[1],ARGV[1]);  //设置过期时间30s
    11. returnnil;
    12. end;
    13. return redis.call('pttl',KEYS[1]); //前面2个if都没进,说明有人抢占并且不是同一线程,直接返回还有多少ms过期

    5. watchDog看门狗机制之时间轮算法

    watchDog看门够机制主要是为了解决锁时间到了,但是业务还没执行完,我希望我这个锁一直被我占有,我业务没执行完,就给我续上。怎么做?无非就是开个定时任务,定时的去判断下这个锁还存不存在,如果存在,续期,如果不存在,不续期。只有分布式锁中默认的30s时间才会触发看门狗机制

    定时调度任务的方式有很多,比如es-job ssl-job  rabbitmq死信队列等分布式的,还有Netty下面的时间轮算法,redis分布式就是采用的时间轮算法。

    5.1 hash轮定时器demo

    1. public static void main(String[] args) {
    2. Timer timer = new HashedWheelTimer(); // 创建一个HashedWheelTimer hash轮定时器
    3. // 提交一个任务 让它在10s后执行
    4. timer.newTimeout(new TimerTask() {
    5. @Override
    6. public void run(Timeout timeout) throws Exception {
    7. System.out.println("10s 后执行该任务1");
    8. }
    9. }, 10, TimeUnit.SECONDS);
    10. // 再提交一个任务,让它在20s后执行
    11. timer.newTimeout(new TimerTask() {
    12. @Override
    13. public void run(Timeout timeout) throws Exception {
    14. System.out.println("20s 后执行该任务1");
    15. }
    16. }, 20, TimeUnit.SECONDS);
    17. // 提交一个任务 让它在10s后执行
    18. timer.newTimeout(new TimerTask() {
    19. @Override
    20. public void run(Timeout timeout) throws Exception {
    21. System.out.println("5s 后执行该任务");
    22. }
    23. }, 5, TimeUnit.SECONDS);
    24. }

    执行结果:

     

    可以看到 hash轮定时器是只会延时执行一次。 

    5.1时间轮算法原理

    有家足浴店,他们的技师是这么安排的!
    总共有 8 个技师,分别是技师 1.2.3.4.5.6.7.8 号。
    技师 1 :负责 0-3 点的顾客
    技师 2 :负责 3-6 点的顾客
    技师 3 :负责 6-9 点的顾客
    技师 4 :负责 9-12 点的顾客
    技师 5 :负责 12-15 点的顾客
    技师 6 :负责 15-18 点的顾客
    技师 7 :负责 18-21 点的顾客
    技师 8 :负责 21-24 点的顾客
     
    那么,假如哪天放假,现在是早上 6 点,我想 9 个小时候去按摩,会是哪个技师为我服务?
    早点 6 点,一天已经过了 6 个小时,那么我服务的绝对时间是 6+9=15 个小时之后。,每个技师是3 小时,那么前面总共要 5 个技师,到了 15 以后就得技师6 来负责。
    那么,哪天晚上我累了,我想去按摩,假如现在是 21 点,我想预约 6 个小时后的按摩,就是6 个小时后我需要按摩,那么哪个技师为我按摩?
    技师 2 我们已知一天会有 8 个技师轮询,一天已经过了 21 点,而我是 6个小 时,那么假如从按摩店的角度去看的话,我要执行的决定时间 21+6 也就是 27 小时, 1 个技师的服务时间是 3 个小时,所以我前面 27 个小时需要 9 个技 师服务,那么你是 27 小时候,就得第 10 个技师,因为是每天轮询的,所以 第十个是技师 2

    时间轮算法流程图: https://www.processon.com/view/link/62b2d1751e08530728b1f954

    5.2 Redisson中的看门狗机制

    在抢占锁的时候,如果抢占成功,才会触发看门狗机制

     

    我们主要看newTimeout部分

    1. Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    2. public void run(Timeout timeout) throws Exception {
    3. RFuture future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
    4. future.addListener(new FutureListener() {
    5. public void operationComplete(Future future) throws Exception {
    6. RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
    7. if (!future.isSuccess()) {
    8. RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
    9. } else {
    10. if ((Boolean)future.getNow()) {
    11. RedissonLock.this.scheduleExpirationRenewal(threadId);
    12. }
    13. }
    14. }
    15. });
    16. }
    17. }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);

     run方法里面为任务,this.internalLockLeaseTime / 3L 代表是多长时间执行1次,等于10s,future中的任务是判断主线程中的业务key是否需要续期,并且重置为默认时间30s; 也就是说,10s会去监听一下执行该延时任务,如果该业务key还在,就续期。那么该延时任务究竟是怎么执行的呢?

    newTimeout源码 执行start方法,该start方法会另起一个守护线程,执行我们的task

     守护线程的真正执行方法在Work的run方法中

    1. public void run() {
    2. // 得到当前的系统时间
    3. HashedWheelTimer.this.startTime = System.nanoTime();
    4. if (HashedWheelTimer.this.startTime == 0L) {
    5. HashedWheelTimer.this.startTime = 1L;
    6. }
    7. // 得到了当前的系统时间后,主线程可以正常执行
    8. HashedWheelTimer.this.startTimeInitialized.countDown();
    9. int idx;
    10. HashedWheelBucket bucket;
    11. do {
    12. // 获取指针下一次走到的时间,如果没到下一次时间,就等待
    13. long deadline = this.waitForNextTick();
    14. if (deadline > 0L) {
    15. // 根据tick与轮的大小取模 得到当前tick所在的环的下标
    16. idx = (int)(this.tick & (long)HashedWheelTimer.this.mask);
    17. // 移除已经取消了的任务
    18. this.processCancelledTasks();
    19. // 根据idx下标 得到轮的hash桶
    20. bucket = HashedWheelTimer.this.wheel[idx];
    21. // 将队列的任务放到相关的hash桶,如果一个数组有多个任务,则采用链表的形式
    22. this.transferTimeoutsToBuckets();
    23. // 去执行hash桶下的任务
    24. bucket.expireTimeouts(deadline);
    25. ++this.tick;
    26. }
    27. } while(HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 1);
    28. HashedWheelBucket[] var5 = HashedWheelTimer.this.wheel;
    29. int var2 = var5.length;
    30. for(idx = 0; idx < var2; ++idx) {
    31. bucket = var5[idx];
    32. bucket.clearTimeouts(this.unprocessedTimeouts);
    33. }
    34. while(true) {
    35. HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
    36. if (timeout == null) {
    37. this.processCancelledTasks();
    38. return;
    39. }
    40. if (!timeout.isCancelled()) {
    41. this.unprocessedTimeouts.add(timeout);
    42. }
    43. }
    44. }

    5.2.1  waitForNextTick方法(获取指针走到下一次时间)

    1. private long waitForNextTick() {
    2. // 100ms走一次,得到下一次得deadLine
    3. long deadline = HashedWheelTimer.this.tickDuration * (this.tick + 1L);
    4. while(true) {
    5. // 得到当前时间跟开始时间的差值(时间轮走了多少时间)
    6. long currentTime = System.nanoTime() - HashedWheelTimer.this.startTime;
    7. //sleepTimeMs如果大于0,说明还没走到指针该走的时间,睡眠
    8. long sleepTimeMs = (deadline - currentTime + 999999L) / 1000000L;
    9. //如果到了时间,直接返回
    10. if (sleepTimeMs <= 0L) {
    11. if (currentTime == Long.MIN_VALUE) {
    12. return -9223372036854775807L;
    13. }
    14. return currentTime;
    15. }
    16. if (PlatformDependent.isWindows()) {
    17. sleepTimeMs = sleepTimeMs / 10L * 10L;
    18. if (sleepTimeMs == 0L) {
    19. sleepTimeMs = 1L;
    20. }
    21. }
    22. try {
    23. //如果没到指针时间,睡眠
    24. Thread.sleep(sleepTimeMs);
    25. } catch (InterruptedException var8) {
    26. if (HashedWheelTimer.WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == 2) {
    27. return Long.MIN_VALUE;
    28. }
    29. }
    30. }
    31. }

    5.2.2  transferTimeoutsToBuckets方法(给队列中的任务找对应的hash桶)

    1. private void transferTimeoutsToBuckets() {
    2. //从队列中获取任务,每次最多只能获取100000次
    3. for(int i = 0; i < 100000; ++i) {
    4. //从队列中获取task,任务会在主线程中添加到队列
    5. HashedWheelTimeout timeout = (HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll();
    6. if (timeout == null) {
    7. break;
    8. }
    9. if (timeout.state() != 1) {
    10. // 任务的deadline/每次指针时间
    11. // 从第一个starttime开始得到需要多少指针
    12. long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration;
    13. /轮数 默认512的环的大小,如果大于,则需要在第二次环的时候才会执行
    14. timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length;
    15. // 取2个的最大值,确保不会把时间安排在过去
    16. long ticks = Math.max(calculated, this.tick);
    17. //取模得到我的指针跳到哪个hash环执行
    18. int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask);
    19. //放到对应的hashWheeld的下标中
    20. HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex];
    21. //如果这个hash下标中有值,则添加至链表后面
    22. bucket.addTimeout(timeout);
    23. }
    24. }
    25. }

    5.2.3  expireTimeouts方法(限期执行hash桶下的任务)

    1. public void expireTimeouts(long deadline) {
    2. HashedWheelTimeout next;
    3. // 循环hash环中链表
    4. for(HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) {
    5. next = timeout.next;
    6. // 是不是当前轮应该执行的任务
    7. if (timeout.remainingRounds <= 0L) {
    8. /从链表清除
    9. next = this.remove(timeout);
    10. // 判断是否到了执行时间
    11. if (timeout.deadline > deadline) {
    12. throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
    13. }
    14. //调用timeout.expire方法,执行task
    15. timeout.expire();
    16. } else if (timeout.isCancelled()) {
    17. next = this.remove(timeout);
    18. } else {
    19. --timeout.remainingRounds;
    20. }
    21. }
    22. }

    expire执行task任务

    5.3 手动释放分布式锁

    KEYS[1] 为大 key KEYS[2] 为加锁的时候订阅的 channelName
    ARGV[1]=UNLOCK_MESSAGE , ARGV[2]=释放时间 ARGV[3]= 重入的线程信息
    1. if(redis.call('hexists',KEYS[1],ARGV[3])==0//如果小key不存在 返回空
    2. then
    3. return nil;
    4.    end;
    5. local counter=redis.call('hincrby',KEYS[1],ARGV[3],-1); //如果存在 可重入次数-1
    6. if(counter>0)  //如果大于0,说明加锁次数大于释放锁次数,不能释放锁
    7. then redis.call('pexpire',KEYS[1],ARGV[2]); //设置锁的过期时间
    8. return0;//返回0
    9. else
    10. redis.call('del',KEYS[1]); //删除大key
    11. redis.call('publish',KEYS[2],ARGV[1]);//往订阅的频道发送message,发送UNLOCK_MESSAGE
    12. return1;
    13. end;
    14. returnnil;

    5.4 联锁

    Redis做分布式遇到的问题?

    因为 Redis 属于 CAP 中的 AP, 为了优先保证高可用性,所以会牺牲一定的数据一致性。比如主从方案中,如果主库挂的话,从库是不管数据有没有同步完主库的数据,都会自动升级主。
    那么这样就会出现一种情况:加锁返回是成功的,但是由于发生了主库挂的,从库切换的时候,没有同步到这个锁,从而导致锁失效。
    那么这个怎么解决?解决不了,能做的是我尽可能的去减少这种情况!
    怎么解决?既然你一个主从实例,会导致锁丢失,那么假如说我把这种风险分担,同时在5 台机器加锁,这 5 台机器只要有一半以上的锁能成功获取,这样就尽可能的减少了锁丢失的场景。
    就相当于你家的门钥匙,假如钥匙只有一把,你丢了就丢了,但是把这个钥匙给你们家里面所有的人,那么你丢了家门还是能进去。因为家里其他人还有。

     实现方案: 给你同时往多个主节点+同一个锁 只要超过一半的节点能够去加锁成功,就成功

  • 相关阅读:
    HTML与CSS的初步解析及实践案例
    一种用于入侵检测的自适应集成机器学习模型
    MySQL 学习笔记(一)MySQL 事务的ACID特性
    提升MySQL查询效率及查询速度优化的4个方法
    手写RPC框架Feign
    eyb:员工管理准备到导入员工数据(四)
    详解ES6的Promise
    python如何调用另一个.py文件中的类和函数
    可燃气体报警器:户外工地安全预警先锋,定期检定保障安全无忧
    使用 golang 发送简单邮件
  • 原文地址:https://blog.csdn.net/still_five_Days/article/details/139329671