• redisson公平锁与非公平原理


    加锁

     private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            //1.尝试加锁,如果加锁成功直接返回
            Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
            if (ttl == null) {
                return;
            }
    		
    		//2.订阅
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            
            if (interruptibly) {
                commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                commandExecutor.syncSubscription(future);
            }
    
    		//3.循环并尝试加锁
            try {
                while (true) {
                    ttl = tryAcquire(-1, leaseTime, unit, threadId);
                    if (ttl == null) {
                        break;
                    }
    
                    if (ttl >= 0) {
                        try {
                            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            if (interruptibly) {
                                throw e;
                            }
                            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        if (interruptibly) {
                            future.getNow().getLatch().acquire();
                        } else {
                            future.getNow().getLatch().acquireUninterruptibly();
                        }
                    }
                }
            } finally {
                unsubscribe(future, threadId);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    1.尝试加锁,如果加锁成功直接返回
    2.订阅一个主题,这个主题与解锁有关
    3.失败后会,循环加锁,这里每次加锁都会返回一个ttl(下次加锁的等待时间)
    但是根据后面的代码可以返现,ttl是可以>0 也是 可以<0 的,从这里就可以猜出来,这是一种策略
    如果ttl>0,那么就等待ttl。如果<0 ,这有可能是需要等待的时间过于长,或者ttl无法精确计算等等导致,为了避免过多的请求锁,那么就进入无限期的等待,等待被其他线程唤醒,至于具体的策略还是要看 ttl 具体是如何计算的,但是从代码就足以说明,是可能出现ttl<0的情况,然而这个特性就可能导致上线的时候出现死锁问题

    tryAcquireAsync(…)

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
            RFuture<Long> ttlRemainingFuture;
            //加锁的核心逻辑
            if (leaseTime != -1) {
                ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            } else {
                ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            }
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e != null) {
                    return;
                }
    
    			//看门狗
                if (ttlRemaining == null) {
                    if (leaseTime != -1) {
                        internalLockLeaseTime = unit.toMillis(leaseTime);
                    } else {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    1.看门狗:定时任务去更新key的时间,防止业务还没执行完就自动解锁了

    公平锁核心逻辑

    一.核心数据结构
    anyLock:Hash类型,锁的名字
    redisson_lock_queue:{anyLock}:list类型,用于线程排队
    redisson_lock_timeout:{anyLock}:zset类型,score 表示等待线程的超时时间戳。
    二.参数
    KEYS[1]:锁名称
    KEYS[2]:redisson_lock_queue:{anyLock};
    KEYS[3]:redisson_lock_timeout:{anyLock}

    ARGV[1]:锁超时时间
    ARGV[2]:线程唯一标识
    ARGV[3]:threadWaitTime 默认 300000
    ARGV[4]: 当前时间戳

    					//移除超时线程
     					"while true do " +
     						//获取第一个线程
                            "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                            "if firstThreadId2 == false then " +
                                "break;" +
                            "end;" +
    
    						//获取超时的时间戳timeout,如果timeout
                            "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                            "if timeout <= tonumber(ARGV[4]) then " +
                                "redis.call('zrem', KEYS[3], firstThreadId2);" +
                                "redis.call('lpop', KEYS[2]);" +
                            "else " +
                                "break;" +
                            "end;" +
                        "end;" +
    
    					//是否能获取到锁
    					//条件:1.anyLock不存在
    					//	   2.当前线程排在队首
                        "if (redis.call('exists', KEYS[1]) == 0) " +
                            "and ((redis.call('exists', KEYS[2]) == 0) " +
                                "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
     
     						//list队列移除当前线程,zset移除当前线程
                            "redis.call('lpop', KEYS[2]);" +
                            "redis.call('zrem', KEYS[3], ARGV[2]);" +
    
    						//更新所有线程的等待时间
    	                   "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
                            "for i = 1, #keys, 1 do " +
                                "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
                            "end;" +
    
    						//设置重入信息和超时时间
                            "redis.call('hset', KEYS[1], ARGV[2], 1);" +
                            "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                            "return nil;" +
                        "end;" +
    
                        // 处理重入锁
                        "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
                            "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                            "return nil;" +
                        "end;" +
    
                        //走到这里说明加锁失败,判断当前线程是否入队,如果入队了,返回一个等待时间
                        "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
                        "if timeout ~= false then " +
                            // the real timeout is the timeout of the prior thread
                            // in the queue, but this is approximately correct, and
                            // avoids having to traverse the queue
                            "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
                        "end;" +
    
                       //将当前线程添加队列,同时记录超时信息,计算出一个等待时间返回
                        "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
                        "local ttl;" +
                        "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
                            "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
                        "else " +
                            "ttl = redis.call('pttl', KEYS[1]);" +
                        "end;" +
                        "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
                        "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                            "redis.call('rpush', KEYS[2], ARGV[2]);" +
                        "end;" +
                        "return ttl;",
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    1.先移除线程
    2.尝试获取锁,能获取到的话会将当前线程的信息移除,同时更新超时时间
    3.处理冲入锁
    4.如果没有获取锁,且当前线程在队列中,返回一个等待时间
    5.计算等待时间,如果当前线程没有入队,则入队且记录超时信息

    公平锁解锁

    			  //剔除超时的线程,与加锁部分类似
    			  "while true do "
                    + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                    + "if firstThreadId2 == false then "
                        + "break;"
                    + "end; "
                    + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                    + "if timeout <= tonumber(ARGV[4]) then "
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                    + "else "
                        + "break;"
                    + "end; "
                  + "end;"
                    
                   
                  //如果当前线程不存在,发布释放锁的消息,通知其他线程解锁
                  + "if (redis.call('exists', KEYS[1]) == 0) then " + 
                        "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                        "if nextThreadId ~= false then " +
                            "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                        "end; " +
                        "return 1; " +
                    "end;" +
    
    				
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
    
    				//处理重入锁
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "end; " +
                        
                     //删除当前线程占用的锁,同时通知其他线程来获取锁
                    "redis.call('del', KEYS[1]); " +
                    "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                    "if nextThreadId ~= false then " +
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                    "end; " +
                    "return 1;"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.剔除超时线程
    2.处理重入锁
    3.删除当前线程占用的锁,同时发布解锁的消息

    非公平锁加锁

    非公平锁就显得非常简单了,带过一下

    						//如果不存在则添加当前线程
    						"if (redis.call('exists', KEYS[1]) == 0) then " +
                            	"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            	"redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        	    "return nil; " +
                            "end; " +
                            //处理重入锁
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            	"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                           	    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    非公平锁解锁

    						//当前线程没有持有锁直接返回
    						"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                            	"return nil;" +
                           	 "end; " +
                           	 //处理重入锁
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                            "if (counter > 0) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                "return 0; " +
                            "else " +
    
    						//删除当前线程key,同时发布解锁的消息
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                            "end; " +
                            "return nil;"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • 相关阅读:
    037、目标检测-算法速览
    Python实战小项目分享
    Ubuntu切换不同版本的cuda
    西华大学计算机考研资料汇总
    redis中value/String
    8年测试工程师总结出来的《测试核心价值》与《0基础转行软件测试超全学习指南》
    pytorch3d安装遇到的一些坑和解决过程
    8.反悔贪心
    python 自定义异常/raise关键字抛出异常
    jmeter通过beanshell获取结果,并写入txt文件
  • 原文地址:https://blog.csdn.net/kznsbs/article/details/125972755