• Java 中通过 key 获取锁


    一、概览

    本文我们将了解如何通过特定键获取锁,以保证该键上的操作的线程安全,并且不妨碍其他键。
    一般来说,我们需要实现两个方法:

    void lock(String key)
    void unlock(String key)
    
    • 1
    • 2

    本文以字符串作为键为例,大家可以根据实际需要改造成任意类型的键,重写 equas 和 hashCode 方法,保证唯一性即可。

    二、简单的互斥锁

    假设需要满足当前线程获取锁则需要执行特定代码,否则不执行这个场景。
    我们可以维护一系列 Key 的 Set, 在使用时添加到 Set 中,解锁时移除对应的 Key。
    此时,需要考虑线程安全问题。因此需要使用线程安全的 Set 实现,如基于 ConcurrentHashMap 的线程安全 Set。

    public class SimpleExclusiveLockByKey {
    
        private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
        
        public boolean tryLock(String key) {
            return usedKeys.add(key);
        }
        
        public void unlock(String key) {
            usedKeys.remove(key);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    使用案例:

    String key = "key";
    SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
    try {
        lockByKey.tryLock(key);
        // 在这里添加对该 key 获取锁之后要执行的代码
    } finally { // 非常关键
        lockByKey.unlock(key);
    }
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    注意一定要在 finally 代码块中解锁,以保证即便发生异常时,也可以正常解锁。

    三、按键来获取和释放锁

    以上代码可以保证获取锁后才执行,但无法实现未拿到锁的线程等待的效果。
    有时候,我们需要让未获取到对应锁的线程等待。
    流程如下:

    • 第一个线程获取某个 key 的锁
    • 第二个线程获取同一个 key 的锁,第二个线程需要等待
    • 第一个线程释放某个 key 的锁
    • 第二个线程获取该 key 的锁,然后执行其代码

    3.1 使用线程计数器定义 Lock

    我们可以使用 ReentrantLock 来实行线程阻塞。
    我们通过内部类来封装 Lock。该类统计某个 key 上执行的线程数。暴露两个方法,一个是线程数增加,一个是减少线程数。

    private static class LockWrapper {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
    
        private LockWrapper addThreadInQueue() {
            numberOfThreadsInQueue.incrementAndGet(); 
            return this;
        }
    
        private int removeThreadFromQueue() {
            return numberOfThreadsInQueue.decrementAndGet(); 
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.2 处理排队的线程

    接下来继续使用 ConcurrentHashMap , key 作为键, LockWrapper 作为值。
    保证同一个 key 使用同一个 LockWrapper 中的同一把锁。

    private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
    
    • 1

    一个线程想要获取某个 key 的锁时,需要看该 key 对应的 LockWrapper 是否已经存在。

    • 如果不存在,创建一个 LockWrapper ,计数器设置为1
    • 如果存在,对应的 LockWrapper 加1
    public void lock(String key) {
        LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
        lockWrapper.lock.lock();
    }
    
    • 1
    • 2
    • 3
    • 4

    3.3 解锁和移除 Entry

    解锁时将等待的队列减一。
    当前 key 对应的线程数为 0 时,可以将其从 ConcurrentHashMap 中移除。

    public void unlock(String key) {
        LockWrapper lockWrapper = locks.get(key);
        lockWrapper.lock.unlock();
        if (lockWrapper.removeThreadFromQueue() == 0) { 
            // NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
            locks.remove(key, lockWrapper);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.4 总结

    最终效果如下:

    public class LockByKey {
        
        private static class LockWrapper {
            private final Lock lock = new ReentrantLock();
            private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
            
            private LockWrapper addThreadInQueue() {
                numberOfThreadsInQueue.incrementAndGet(); 
                return this;
            }
            
            private int removeThreadFromQueue() {
                return numberOfThreadsInQueue.decrementAndGet(); 
            }
            
        }
        
        private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
        
        public void lock(String key) {
            LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
            lockWrapper.lock.lock();
        }
        
        public void unlock(String key) {
            LockWrapper lockWrapper = locks.get(key);
            lockWrapper.lock.unlock();
            if (lockWrapper.removeThreadFromQueue() == 0) { 
                // NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
                locks.remove(key, lockWrapper);
            }
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    使用示例:

    String key = "key"; 
    LockByKey lockByKey = new LockByKey(); 
    try { 
        lockByKey.lock(key);
        // insert your code here 
    } finally { // CRUCIAL 
        lockByKey.unlock(key); 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    四、允许同一个 key 同时多个线程运行

    我们还需要考虑另外一种场景: 前面对于同一个 key 同一时刻只允许一个线程执行。如果我们想实现,对于同一个 key ,允许同时运行 n 个线程该怎么办?
    为了方便理解,我们假设同一个 key 允许两个线程。

    • 第一个线程想要获取 某个 key 的锁,允许
    • 第二个线程也想要获取该 key 的锁,允许
    • 第三个线程也想获取该 key 的锁,该线程需要等待第一个或第二个线程释放锁之后才可以执行

    Semaphore 很适合这种场景。Semaphore 可以控制同时运行的线程数。

    public class SimultaneousEntriesLockByKey {
    
        private static final int ALLOWED_THREADS = 2;
        
        private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
        
        public void lock(String key) {
            Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
            semaphore.acquireUninterruptibly();
        }
        
        public void unlock(String key) {
            Semaphore semaphore = semaphores.get(key);
            semaphore.release();
            if (semaphore.availablePermits() == ALLOWED_THREADS) { 
                semaphores.remove(key, semaphore);
            }  
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    使用案例:

    String key = "key"; 
    SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey(); 
    try { 
        lockByKey.lock(key); 
        // 在这里添加对该 key 获取锁之后要执行的代码
    } finally { // 非常关键
        lockByKey.unlock(key); 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    五、结论

    本文演示如何对某个 key 加锁,以保证对该 key 的并发操作限制,可以实现同一个 key 一个或者多个线程同时执行。
    相关代码:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-advanced-4

    原文:https://www.baeldung.com/java-acquire-lock-by-key

  • 相关阅读:
    【STM32】GPIO控制LED(寄存器版)
    Spring Retry 在SpringBoot 中的应用
    信号的处理时机(内核态,用户态,如何/为什么相互转换,内核空间,cpu寄存器),信号的处理流程详细介绍+抽象图解
    【Java面试】生产环境服务器变慢,如何诊断处理?
    Unity构建详解(10)——Unity构建流程
    makefile 自动生成依赖关系-笔记
    新学期如何克服“社恐”,猿辅导老师给高中生三条建议
    使用信号量解决并发问题
    【副业合集】60个正规可做兼职的网站
    手机微信里面的文件打印步骤
  • 原文地址:https://blog.csdn.net/w605283073/article/details/127858281