• Redisson分布式锁学习


           之前工作中一直使用redis来实现分布式锁,但是最近项目使用了云弹性,机器会涉及到扩缩容,涉及到优雅停机的问题,普通的redis分布锁,一般使用时会设置锁的时间,但是如果在加锁期间 JVM异常重启等发生会导致分布式锁得不到及时释放,即使机器重启,还是获取不到分布式锁。因此决定使用一下Redisson来解决这个问题。

    基于redis实现的分布式锁

    加锁代码如下:

    1. public boolean tryGlobalLock(String key, Integer expireSeconds) {
    2. Long resultLong = new Executor<Long>() {
    3. @Override
    4. public Long executor(String key, JedisCluster jedisCluster) {
    5. String status = jedisCluster.set(key, GLOBAL_LOCK_VALUE, SetParams.setParams()
    6. .nx()
    7. .ex(expireSeconds == null ? DEFAULT_LOCK_EXPIRE_TIME : expireSeconds));
    8. if ("OK".equalsIgnoreCase(status)) {
    9. // 第一次设置,设置成功
    10. return 1L;
    11. } else {
    12. // 已经存在这个key
    13. return 0L;
    14. }
    15. }
    16. }.run(key);
    17. return resultLong == 1L;
    18. }

    一般使用流程如下:

    1. // 尝试获取分布式锁
    2. // 如果获取失败 则直接返回
    3. // 如果获取成功
    4. // 执行业务逻辑
    5. // 业务逻辑执行成功 要释放锁
    6. // 业务逻辑执行失败 要释放锁

    如果在执行业务逻辑过程中 机器重启 优雅停机处理不合理 则会导致分布式锁不能及时释放,机器重启后,分布式锁仍获取不到,需要等待锁过期失效。

    基于redisson实现的分布式锁

    引入依赖

    1. <dependency>
    2. <groupId>org.redisson</groupId>
    3. <artifactId>redisson</artifactId>
    4. <version>3.17.5</version>
    5. </dependency>

    锁配置

    1. @Configuration
    2. @Slf4j
    3. public class RedissonConfig {
    4. private String nodesString = "";
    5. private String password = "";
    6. @Bean
    7. public Redisson redisson() {
    8. // 这里连接串是使用 逗号拼接的所以手动分隔一下
    9. String[] nodeArray = nodesString.split(",");
    10. Config config = new Config();
    11. // 使用redis集群配置
    12. ClusterServersConfig clusterServersConfig = config.useClusterServers();
    13. for (String node : nodeArray) {
    14. clusterServersConfig.addNodeAddress("redis://"+node);
    15. }
    16. try {
    17. clusterServersConfig.setPassword(password);
    18. } catch (Exception exception) {
    19. log.error("init redisson fail ",exception);
    20. }
    21. return (Redisson) Redisson.create(config);
    22. }
    23. }

    redisson分布式锁的使用很简单

    1. @Autowired
    2. private Redisson redisson;
    3. // 获取锁对象
    4. RLock lock = redisson.getLock(lockName);
    5. logger.info("try get lock start>>>> key = {} currentThread = {}", messageManagerVO.getMsgType(), Thread.currentThread().getName());
    6. try {
    7. // 在指定时间范围内尝试加锁
    8. boolean flag = lock.tryLock(tryGetSeconds, TimeUnit.SECONDS);
    9. logger.info("try get lock end>>>> key = {} flag = {} currentThread = {}", messageManagerVO.getMsgType(), flag, Thread.currentThread().getName());
    10. if (flag) {
    11. // 模拟事务处理逻辑
    12. Thread.sleep(doBizSeconds * 1000 * 60);
    13. // 释放锁
    14. lock.unlock();
    15. logger.info("try release lock end>>>> key = {}, currentThread = {}", messageManagerVO.getMsgType(), Thread.currentThread().getName());
    16. }
    17. } catch (InterruptedException e) {
    18. logger.info("RedissonService tryGlobalLock exception", e);
    19. }

    getLock方法获取锁对象

    tryLock方法尝试加锁 不需要配置锁过期时间 没有执行unlock方法之前 锁会自动续约 如果线程中断 则锁会自动释放

    unlock 释放锁

    加锁是以当前线程来加锁的,一但当前线程获取到 则其他线程不能获取锁。

    redisson源码简读

    加锁逻辑

    tryLock方法依次进入

    1. public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    2. return this.tryLock(waitTime, -1L, unit);
    3. }

    首先查看正常获取锁的逻辑

    1. long time = unit.toMillis(waitTime);
    2. long current = System.currentTimeMillis();
    3. long threadId = Thread.currentThread().getId();
    4. Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    5. if (ttl == null) {
    6. return true;
    7. }

    核心方法 tryAcquireAsync

    1. private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    2. RFuture ttlRemainingFuture;
    3. if (leaseTime > 0L) {
    4. ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    5. } else {
    6. ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    7. }
    8. CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
    9. if (ttlRemaining == null) {
    10. if (leaseTime > 0L) {
    11. this.internalLockLeaseTime = unit.toMillis(leaseTime);
    12. } else {
    13. this.scheduleExpirationRenewal(threadId);
    14. }
    15. }
    16. return ttlRemaining;
    17. });
    18. return new CompletableFutureWrapper(f);
    19. }

    其中方法

    1. RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    2. return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    3. }

    可以看到实际上是异步执行一个redis lua脚本(Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval /evalsha 命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作)

    1. if (redis.call('exists', KEYS[1]) == 0)
    2. then redis.call('hincrby', KEYS[1], ARGV[2], 1);
    3. redis.call('pexpire', KEYS[1], ARGV[1]);
    4. return nil; end;
    5. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    6. then redis.call('hincrby', KEYS[1], ARGV[2], 1);
    7. redis.call('pexpire', KEYS[1], ARGV[1]);
    8. return nil; end;
    9. return redis.call('pttl', KEYS[1]);

    其中脚本中涉及redis基本命令如下:

    EXISTS 命令用于检查给定 key 是否存在  若 key 存在返回 1 ,否则返回 0;

    Hincrby 命令用于为哈希表中的字段值加上指定增量值,如果哈希表的 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。如果指定的字段不存在,那么在执行命令前,字段的值被初始化为 0;

    PEXPIRE 命令以毫秒为单位设置 key 的生存时间 设置成功,返回 1 key 不存在或设置失败,返回 0;

    Hexists 命令用于查看哈希表的指定字段是否存在 如果哈希表含有给定字段,返回 1 。 如果哈希表不含有给定字段,或 key 不存在,返回 0;

    Pttl 命令以毫秒为单位返回 key 的剩余过期时间  当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。 否则,以毫秒为单位,返回 key 的剩余生存时间;

    参数含义如下:

    KEYS保存分布式锁的名称 

    ARGV[1]  对应KEYS过期时间  默认为30s

    ARGV[2] 对应线程ID

    1. // 如果第一次加锁 则key不存在 则创建key hashmap 并将线程ID 放入map中 设置为1 设置过期时间
    2. if (redis.call('exists', KEYS[1]) == 0)
    3. then redis.call('hincrby', KEYS[1], ARGV[2], 1);
    4. redis.call('pexpire', KEYS[1], ARGV[1]);
    5. return nil; end;
    6. // 如果key已经存在 并且map中含有线程ID 则将线程ID加一 实现可重入锁 设置过期时间
    7. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    8. then redis.call('hincrby', KEYS[1], ARGV[2], 1);
    9. redis.call('pexpire', KEYS[1], ARGV[1]);
    10. return nil; end;
    11. // 返回锁的剩余时间
    12. return redis.call('pttl', KEYS[1]);

    释放锁逻辑

    unlock方法
    1. public void unlock() {
    2. try {
    3. this.get(this.unlockAsync(Thread.currentThread().getId()));
    4. } catch (RedisException var2) {
    5. if (var2.getCause() instanceof IllegalMonitorStateException) {
    6. throw (IllegalMonitorStateException)var2.getCause();
    7. } else {
    8. throw var2;
    9. }
    10. }
    11. }
    1. public RFuture<Void> unlockAsync(long threadId) {
    2. // 释放锁
    3. RFuture<Boolean> future = this.unlockInnerAsync(threadId);
    4. CompletionStage<Void> f = future.handle((opStatus, e) -> {
    5. // 取消锁的续约逻辑
    6. this.cancelExpirationRenewal(threadId);
    7. if (e != null) {
    8. throw new CompletionException(e);
    9. } else if (opStatus == null) {
    10. IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
    11. throw new CompletionException(cause);
    12. } else {
    13. return null;
    14. }
    15. });
    16. return new CompletableFutureWrapper(f);
    17. }

    主要包括释放锁和取消锁续约

    释放锁执行lua脚本

    1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    2. return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
    3. }
    1. // 如果线程ID在map中不存在 则直接返回nil
    2. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
    3. then return nil;end;
    4. // 如果线程ID在map中存在 则减一 返回当前对应的value值counter
    5. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    6. // 如果counter大于0 表示可重入锁没有全部释放完 则续约
    7. if (counter > 0)
    8. then redis.call('pexpire', KEYS[1], ARGV[2]);
    9. return 0;
    10. else
    11. 如果 counter=0 表示锁已经不存在 则直接删除key
    12. redis.call('del', KEYS[1]);
    13. redis.call('publish', KEYS[2], ARGV[1]);
    14. return 1; end;
    15. return nil;

    锁续约逻辑

    redisson中数据结构(map)如下:

    lockName:  过期时间

          线程ID  线程重入次数

    由加锁逻辑可知  默认锁的过期时间为30s 后续会不断进行续约 保证锁不会释放

    tryAcquireAsync方法中加锁之后会进行锁的续约
    1. private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    2. RFuture ttlRemainingFuture;
    3. if (leaseTime > 0L) {
    4. ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    5. } else {
    6. ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    7. }
    8. CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
    9. if (ttlRemaining == null) {
    10. if (leaseTime > 0L) {
    11. this.internalLockLeaseTime = unit.toMillis(leaseTime);
    12. } else {
    13. this.scheduleExpirationRenewal(threadId);
    14. }
    15. }
    16. return ttlRemaining;
    17. });
    18. return new CompletableFutureWrapper(f);
    19. }

    进入方法scheduleExpirationRenewal

    1. protected void scheduleExpirationRenewal(long threadId) {
    2. ExpirationEntry entry = new ExpirationEntry();
    3. ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    4. if (oldEntry != null) {
    5. oldEntry.addThreadId(threadId);
    6. } else {
    7. entry.addThreadId(threadId);
    8. try {
    9. this.renewExpiration();
    10. } finally {
    11. if (Thread.currentThread().isInterrupted()) {
    12. this.cancelExpirationRenewal(threadId);
    13. }
    14. }
    15. }
    16. }

    继续进入renewExpiration方法

    1. private void renewExpiration() {
    2. ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    3. if (ee != null) {
    4. Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    5. public void run(Timeout timeout) throws Exception {
    6. ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
    7. if (ent != null) {
    8. Long threadId = ent.getFirstThreadId();
    9. if (threadId != null) {
    10. CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
    11. future.whenComplete((res, e) -> {
    12. if (e != null) {
    13. RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
    14. RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
    15. } else {
    16. if (res) {
    17. RedissonBaseLock.this.renewExpiration();
    18. } else {
    19. RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
    20. }
    21. }
    22. });
    23. }
    24. }
    25. }
    26. }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
    27. ee.setTimeout(task);
    28. }
    29. }

    锁续约的方法renewExpirationAsync

    1. protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    2. return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
    3. }

    LUA脚本

    1. // 如果缓存中map含有当前线程ID 则重置缓存失效时间 默认30s
    2. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    3. then redis.call('pexpire', KEYS[1], ARGV[1]);
    4. return 1; end;
    5. return 0;

    取消锁续约逻辑

    1. protected void cancelExpirationRenewal(Long threadId) {
    2. ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    3. if (task != null) {
    4. if (threadId != null) {
    5. task.removeThreadId(threadId);
    6. }
    7. if (threadId == null || task.hasNoThreads()) {
    8. Timeout timeout = task.getTimeout();
    9. if (timeout != null) {
    10. timeout.cancel();
    11. }
    12. EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
    13. }
    14. }
    15. }

    redisson围绕map 线程ID 重入次数 数据结构来通过lua脚本原子执行来保证分布式锁。其功能很强大 ,可以实现其他公平锁 读写锁等功能,后面可以深入了解一下。

    参考资料

    最强分布式锁工具:Redisson

    https://github.com/redisson/redisson

  • 相关阅读:
    笔试面试相关记录(9)
    基于模型预测人工势场的船舶运动规划方法,考虑复杂遭遇场景下的COLREG(Matlab代码实现)
    java计算机毕业设计springboot+vue航空公司电子售票系统-机票预订系统
    编译 qsqlmysql.dll QMYSQL driver not loaded
    麦克纳姆轮x运动学分析
    回归算法的评估指标
    CSS 元素的显示与隐藏
    S-2K2001 pmon调试
    Android安全机制介绍及实践
    [附源码]计算机毕业设计springboot面向高校活动聚App
  • 原文地址:https://blog.csdn.net/u014106644/article/details/133969988