在传统的单体应用中,我们可以使用Java并发处理相关的API(如ReentrantLock或synchronized)来实现对共享资源的互斥控制,确保在高并发情况下同一时间只有一个线程能够执行特定方法。然而,随着业务的发展,单体应用逐渐演化为分布式系统,多线程、多进程分布在不同机器上,这导致了原有的单机部署下的并发控制策略失效。为了解决这一问题,我们需要引入一种跨JVM的互斥机制来管理共享资源的访问,这就是分布式锁所要解决的核心问题。
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
Redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。
在以下场景中:
上面的删除情况也无法保证原子性,只能通过lua脚本实现
如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。
在Redis中需要通过eval命令执行lua脚本
highlighter-
EVAL script numkeys key [key ...] arg [arg ...] script:lua脚本字符串,这段Lua脚本不需要(也不应该)定义函数。 numkeys:lua脚本中KEYS数组的大小 key [key ...]:KEYS数组中的元素 arg [arg ...]:ARGV数组中的元素
案列1:动态传参
lua
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 8 10 30 40 50 60 70 # 输出:8 10 60 70 EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20 # 输出:0 EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10 # 输出:1
案列2:执行redis类库方法
lua
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 bbb 20
可重入性是指一个线程在持有锁的情况下,可以多次获取同一个锁而不会发生死锁或阻塞的特性。在可重入锁中,线程可以重复获取已经持有的锁,每次获取都会增加一个计数器,直到计数器归零时才会真正释放锁。
下面是一个示例代码来说明可重入性:
java
public synchronized void a() { b(); } public synchronized void b() { // pass }
假设线程X在方法a中获取了锁后,继续执行方法b。如果这是一个不可重入的锁,线程X在执行b方法时将会被阻塞,因为它已经持有了该锁并且无法再次获取。这种情况下,线程X必须等待自己释放锁后才能再次争抢该锁。
而对于可重入性的情况,当线程X持有了该锁后,在遇到加锁方法时会直接将加锁次数加1,并继续执行方法逻辑。当退出加锁方法时,加锁次数再减1。只有当加锁次数归零时,该线程才会真正释放该锁。
因此,可重入性的最大特点就是计数器的存在,用于统计加锁的次数。在分布式环境中实现可重入分布式锁时也需要考虑如何正确统计和管理加锁次数。
Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的锁的重入次数,然后利用 lua 脚本判断逻辑。
lua
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('hincrby', KEYS[1], ARGV[1], 1); redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end
假设值为:KEYS:[lock], ARGV[uuid, expire]
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。
lua
-- 判断 hash set 可重入 key 的值是否等于 0 -- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败 -- 如果为 0 代表 可重入次数被减 1 -- 如果为 1 代表 该可重入 key 解锁成功 if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil; elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0; else redis.call('del', KEYS[1]); return 1; end;
如果锁不存在直接返回null,如果锁存在就对数量进行减一,如果减到等于0 就直接删除此锁
有可能代码没执行完毕,锁就到期了。基于上面这种情况需要对锁进行续期。使用定时器加lua脚本进行对锁续期
lua
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], ARGV[2]); return 1; else return 0; end
考虑到分布式锁可能使用多种方式实现,比如Redis、mysql、zookeeper,所以暂时做成一个工厂类,按需使用。
以下是完整代码:
java
public class DistributedRedisLock implements Lock { private StringRedisTemplate redisTemplate; private String lockName; private String uuid; private long expire = 30; public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) { this.redisTemplate = redisTemplate; this.lockName = lockName; this.uuid = uuid + ":" + Thread.currentThread().getId(); } @Override public void lock() { this.tryLock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { try { return this.tryLock(-1L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 加锁方法 * @param time * @param unit * @return * @throws InterruptedException */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if (time != -1){ this.expire = unit.toSeconds(time); } String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " redis.call('hincrby', KEYS[1], ARGV[1], 1) " + " redis.call('expire', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " return 0 " + "end"; while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){ Thread.sleep(50); } // 加锁成功,返回之前,开启定时器自动续期 this.renewExpire(); return true; } /** * 解锁方法 */ @Override public void unlock() { String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " + "then " + " return nil " + "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid); if (flag == null){ throw new IllegalMonitorStateException("this lock doesn't belong to you!"); } } @Override public Condition newCondition() { return null; } private void renewExpire(){ String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " return redis.call('expire', KEYS[1], ARGV[2]) " + "else " + " return 0 " + "end"; new Timer().schedule(new TimerTask() { @Override public void run() { if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) { renewExpire(); } } }, this.expire * 1000 / 3); } }
DistributedLockClient
java
@Component public class DistributedLockClient { @Autowired private StringRedisTemplate redisTemplate; private String uuid; public DistributedLockClient() { this.uuid = UUID.randomUUID().toString(); } public DistributedRedisLock getRedisLock(String lockName){ return new DistributedRedisLock(redisTemplate, lockName, uuid); } }
在业务代码中使用:
java
public void deduct() { DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock"); redisLock.lock(); try { // 1. 查询库存信息 String stock = redisTemplate.opsForValue().get("stock").toString(); // 2. 判断库存是否充足 if (stock != null && stock.length() != 0) { Integer st = Integer.valueOf(stock); if (st > 0) { // 3.扣减库存 redisTemplate.opsForValue().set("stock", String.valueOf(--st)); } } } finally { redisLock.unlock(); } } 测试可重入性:
在Redis集群状态下可能出现的问题如下: