锁实现 | 实现方式 | 性能 | 选型注意 | 选择关注点 |
mysql | 好 | 并发场景锁无效 | 低成本实现 | |
悲观锁 | 差 | 可能导致锁表 | 极端场景 | |
zk | 顺序节点 | 中 | 性能、可靠一般 | 性能、可靠的兼顾选择 |
redis | setNx | 低 | 锁没有唯一标示 | 简单但不推荐 |
lua脚本 | 最高 | 用不好效果更差 | 大神选用不是大神用redisson | |
redisson | 中高 | 平衡做的好 | 关注性能 |
悲观锁实现:悲观锁实现简单 select * from table for update,悲观锁是所有锁中理论最靠谱的一种,但是性能差。在并发场景不推荐使用;
org.apache.curator -
curator-recipes -
2.12.0 -
- /**
- * 功能:zk - zk Curator客户端 - 实现分布式锁测试
- * 作者:丁志超
- */
- public class ZkCuratorLock{
- //实例化客户端
- private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
- private static CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString("ip:2181")
- .sessionTimeoutMs(3000)
- .connectionTimeoutMs(5000)
- .retryPolicy(retryPolicy)
- .build();
- //zk分布式锁创建节点在零时目录zklock下创建
- static String lockPath = "/zklock";
- //实例化分布式锁
- final static InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
- public static void main(String[] args) {
- //获取锁
- try {
- lock.acquire();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }finally {
- //释放锁
- try {
- lock.release();
- } catch (Exception e) {
- }
- }
- }
- }
- //1、Curator锁数据结构
- private static class LockData
- {
- //当前拥有锁的线程
- final Thread owningThread;
- //当前锁的路径
- final String lockPath;
- //锁计数器
- final AtomicInteger lockCount = new AtomicInteger(1);
- }
- //2、这段是Curator的精华 - 获取锁成功后验证 及 获取锁失败后等待
- private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
- {
- boolean haveTheLock = false;
- boolean doDelete = false;
- try
- {
- if ( revocable.get() != null )
- {
- client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
- }
- while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
- {
- List
children = getSortedChildren(); - String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
- //成功获取锁
- PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
- if ( predicateResults.getsTheLock() )
- {
- haveTheLock = true;
- }
- else
- {
- //没有获取锁,监听拥有锁节点的变化
- String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
- //获取锁失败后等待超时如果不设置超时一直等待
- synchronized(this)
- {
- try
- {
- // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
- client.getData().usingWatcher(watcher).forPath(previousSequencePath);
- if ( millisToWait != null )
- {
- millisToWait -= (System.currentTimeMillis() - startMillis);
- startMillis = System.currentTimeMillis();
- if ( millisToWait <= 0 )
- {
- doDelete = true; // timed out - delete our node
- break;
- }
- wait(millisToWait);
- }
- else
- {
- wait();
- }
- }
- catch ( KeeperException.NoNodeException e )
- {
- // it has been deleted (i.e. lock released). Try to acquire again
- }
- }
- }
- }
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- doDelete = true;
- throw e;
- }
- finally
- {
- if ( doDelete )
- {
- deleteOurPath(ourPath);
- }
- }
- return haveTheLock;
- }
- //3、获取锁算法思想 - maxLeases默认值是1,要求获取锁的线程永远是list的第一个线程,保证获取锁顺序性
- public PredicateResults getsTheLock(CuratorFramework client, List
children, String sequenceNodeName, int maxLeases) throws Exception - {
- int ourIndex = children.indexOf(sequenceNodeName);
- validateOurIndex(sequenceNodeName, ourIndex);
- boolean getsTheLock = ourIndex < maxLeases;
- String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
- return new PredicateResults(pathToWatch, getsTheLock);
- }
- //setNx实现分布式锁
- public class SetNxLock {
- public static void main(String[] args) {
- Jedis jedis = new Jedis("localhost");
- jedis.setnx("key", "value");
- try {
- if(jedis.exists("key")) {
- jedis.expire("key", 10);
- System.out.println("我获取了锁,干点活!");
- jedis.del("key");
- }
- } catch (Exception e) {
- } finally {
- jedis.del("key");
- }
- }
- }
- /**
- * 功能:redis - lua - lua实现分布式锁 使用redis.clients客户端
- * 作者:丁志超
- */
- public class LuaLock {
- public static void main(String[] args) {
- lock("122333", "33331","10000" );
- unlock("122333", "33331");
- }
- /**
- * 加锁语法
- * key:redis key
- * value:redis value
- * time: redis timeouts 锁过期时间一般大于最耗时的业务消耗的时间
- * 语法参考文档:https://www.runoob.com/redis/redis-scripting.html
- * */
- public static String lock(String key, String value,String timeOut ) {
- /**
- * -- 加锁脚本,其中KEYS[]为外部传入参数
- * -- KEYS[1]表示key
- * -- ARGV[1]表示value
- * -- ARGV[2]表示过期时间
- */
- String lua_getlock_script = "if redis.call('SETNX','"+key+"','"+value+"') == 1 then" +
- " return redis.call('pexpire','"+key+"','"+timeOut+"')" +
- " else" +
- " return 0 " +
- "end";
- Jedis jedis = new Jedis("localhost");
- //在缓存中添加脚本但不执行
- String scriptId = jedis.scriptLoad(lua_getlock_script);
- //查询脚本是否添加
- Boolean isExists = jedis.scriptExists(scriptId);
- //执行脚本 返回1表示成功,返回0表示失败
- Object num = jedis.eval(lua_getlock_script);;
- return String.valueOf(num);
- }
- /**
- * 释放锁语法
- * key:redis key
- * value:redis value
- * time: redis timeouts 锁过期时间一般大于最耗时的业务消耗的时间
- * 语法参考文档:https://www.runoob.com/redis/redis-scripting.html
- * */
- public static String unlock(String key, String value ) {
- /**
- * -- 加锁脚本,其中KEYS[]为外部传入参数
- * -- KEYS[1]表示key
- * -- ARGV[1]表示value
- * -- ARGV[2]表示过期时间
- */
- String lua_unlock_script =
- "if redis.call('get','"+key+"') == '"+value+"' then " +
- " return redis.call('del','"+key+"') " +
- "else return 0 " +
- "end";
- Jedis jedis = new Jedis("localhost");
- //在缓存中添加脚本但不执行
- String scriptId = jedis.scriptLoad(lua_unlock_script);
- //查询脚本是否添加
- Boolean isExists = jedis.scriptExists(scriptId);
- //执行脚本 返回1表示成功,返回0表示失败
- Object num = jedis.eval(lua_unlock_script);;
- return String.valueOf(num);
- }
- }
- /**
- * 功能:redis - Redisson - Redisson实现分布式锁
- * 作者:丁志超
- */
- public class RedissonLock {
- public static void main(String[] args) {
- Config config = new Config();
- config.useSingleServer().setAddress("localhost");
- RedissonClient redissonClient = Redisson.create(config);
- RLock rLock = redissonClient.getLock("key");
- try {
- rLock.tryLock(10, TimeUnit.SECONDS);
- System.out.println("我获取了锁,该我干活了。");
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }finally {
- if(rLock.isLocked()) {
- rLock.unlock();
- }
- }
- }
- }
SET resource_name my_random_value NX PX 30000
- if redis.call("get",KEYS[1]) == ARGV[1] then
- return redis.call("del",KEYS[1])
- else
- return 0
- end
- //redisson锁实体数据结构
- public class RedissonLockEntry {
- //计数器
- private int counter;
- //信号类 控制多少线程同时获取锁
- private final Semaphore latch;
- private final RPromise
promise; - //线程队列
- private final ConcurrentLinkedQueue
listeners = new ConcurrentLinkedQueue(); - }
- //源码类位置 redisson RedissonLock.class
- //redisson实现分布式锁源码解析
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
- long time = unit.toMillis(waitTime);
- long current = System.currentTimeMillis();
- long threadId = Thread.currentThread().getId();
- //尝试获取锁其实现见后面的方法
- Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
- //如果锁超时直接返回失败
- time -= System.currentTimeMillis() - current;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- current = System.currentTimeMillis();
- //通过线程ID获取锁结构体
- RFuture
subscribeFuture = subscribe(threadId); - if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.onComplete((res, e) -> {
- if (e == null) {
- unsubscribe(subscribeFuture, threadId);
- }
- });
- }
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- try {
- //再次检查锁是否超时,如果超时释放锁
- time -= System.currentTimeMillis() - current;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- //在非超时周期内通过自旋方式获取锁
- while (true) {
- long currentTime = System.currentTimeMillis();
- ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
- //超时释放锁
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- // waiting for message
- currentTime = System.currentTimeMillis();
- if (ttl >= 0 && ttl < time) {
- subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0) {
- acquireFailed(waitTime, unit, threadId);
- return false;
- }
- }
- } finally {
- unsubscribe(subscribeFuture, threadId);
- }
- // return get(tryLockAsync(waitTime, leaseTime, unit));
- }
- //redisson获取锁最底层实现,使用lua脚本实现,如果有key返回key的剩余生命时间
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
- //若 key 存在返回 1 ,否则返回 0
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- //给key增加超时时间
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- //设置key的生命周期
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- //查询key存在
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
- }
- //释放锁
- @Override
- protected RFuture
acquireFailedAsync(long waitTime, TimeUnit unit, long threadId) { - long wait = threadWaitTime;
- if (waitTime != -1) {
- wait = unit.toMillis(waitTime);
- }
- //这块看的有点懵,等学习lua在看
- return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
- // 移除list超时的key及对应的线程
- "local queue = redis.call('lrange', KEYS[1], 0, -1);" +
- // find the location in the queue where the thread is
- "local i = 1;" +
- "while i <= #queue and queue[i] ~= ARGV[1] do " +
- "i = i + 1;" +
- "end;" +
- // go to the next index which will exist after the current thread is removed
- "i = i + 1;" +
- // decrement the timeout for the rest of the queue after the thread being removed
- "while i <= #queue do " +
- "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), queue[i]);" +
- "i = i + 1;" +
- "end;" +
- // remove the thread from the queue and timeouts set
- //移除超时的线程
- "redis.call('zrem', KEYS[2], ARGV[1]);" +
- "redis.call('lrem', KEYS[1], 0, ARGV[1]);",
- Arrays.
- getLockName(threadId), wait);
- }
redis看门狗机制就是锁过期时间自动续期的一种自动检查机制,具体涉及下面2个方法。 看门狗机制采用续期的方式保证了方法一定会执行完毕,但是会导致系统的并发大大降低。
2、lock(long leaseTime, TimeUnit unit):指定过期时间,超过有效期时间后,会自动释放锁
leaseTime:必须是 -1 才会开启 Watch Dog 机制,如果需要开启 Watch Dog 机制就必须使用默认的加锁时间为 30s。 如果你自己自定义时间,超过这个时间,锁就会自定释放,并不会自动续期。
- public void lock() {
- try {
- // 过期时间为-1,表示永不过期
- lock(-1, null, false);
- } catch (InterruptedException e) {
- throw new IllegalStateException();
- }
- }
- private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
- long threadId = Thread.currentThread().getId();
- Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
- if (ttl == null) {
- // 获取到锁直接返回
- return;
- }
- //还未获取到锁
- // 订阅锁,这样锁释放时会被通知到
- RFuture
future = subscribe(threadId); - if (interruptibly) {
- commandExecutor.syncSubscriptionInterrupted(future);
- } else {
- commandExecutor.syncSubscription(future);
- }
- try {
- while (true) {
- ttl = tryAcquire(-1, leaseTime, unit, threadId);
- if (ttl == null) {
- // 获取到锁则可以退出死循环
- break;
- }
- if (ttl >= 0) {
- try {
- // 指定超时时间内获取
- future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- if (interruptibly) {
- throw e;
- }
- future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- }
- } else {
- // 未指定超时时间获取
- if (interruptibly) {
- future.getNow().getLatch().acquire();
- } else {
- future.getNow().getLatch().acquireUninterruptibly();
- }
- }
- }
- } finally {
- // 取消锁的订阅
- unsubscribe(future, threadId);
- }
- // get(lockAsync(leaseTime, unit));
- }
- private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
- return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
- }
- private
RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { - RFuture
ttlRemainingFuture; - if (leaseTime != -1) {
- // 指定过期时间
- ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- } else {
- // 未指定过期时间
- // 过期时间设为看门狗超时时间,然后由看门狗一直续期,直到锁释放
- ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
- TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
- }
- //看门狗实现核心回调scheduleExpirationRenewal自动给锁续期
- ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
- if (e != null) {
- return;
- }
- if (ttlRemaining == null) {
- // 获取到锁,不会定时续期
- if (leaseTime != -1) {
- internalLockLeaseTime = unit.toMillis(leaseTime);
- } else {
- // 未指定过期时间,需要开启Watchdog自动续期
- scheduleExpirationRenewal(threadId);
- }
- }
- });
- return ttlRemainingFuture;
- }
- //首先看下尝试获取锁的实现,tryLockInnerAsync方法通过EVAL执行LUA脚本,代码如下:
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { - return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
- "redis.call('pexpire', KEYS[1], ARGV[1]); " +
- "return nil; " +
- "end; " +
- "return redis.call('pttl', KEYS[1]);",
- Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
- }
在算法的分布式版本中,我们假设我们有N个 Redis 主节点,这些节点是完全独立的。上面已经介绍单个节点如何获取释放锁。那么在集群中每台独立的redis也会使用这种方式获取释放锁。我们假设5台redis节点,这个数值不是固定的是业务需要的选择。
2、它尝试顺序获取所有 N 个实例中的锁,在所有实例中使用相同的键名和随机值。在第 2 步中,当在每个实例中设置锁时,客户端使用一个比总锁自动释放时间更小的超时来获取它。例如,如果自动释放时间为 10 秒,则超时可能在 ~ 5-50 毫秒范围内。这可以防止客户端长时间处于阻塞状态,试图与已关闭的 Redis 节点通信:如果实例不可用,我们应该尽快尝试与下一个实例通信。
3、客户端通过从当前时间中减去步骤 1 中获得的时间戳来计算获取锁所用的时间。当且仅当客户端能够在大多数实例(至少 3 个)中获取锁,并且获取锁所用的总时间小于锁有效时间,则认为该锁已获取。
4、如果获得了锁,则其有效时间被认为是初始有效时间减去经过的时间,如步骤 3 中计算的那样。
5、如果客户端由于某种原因获取锁失败(或者它无法锁定 N/2+1 个实例或有效时间为负),它将尝试解锁所有实例(即使是它认为没有锁定的实例)能够锁定)。
4、redis获取锁的有效时间等于锁有效时间减去获取锁花费的时间在减去集群节点时间差。怎么理解?木桶效应比如锁生命周期是10秒,最快节点1秒获取,最慢的3秒获取。此时锁生命周期就是 10-(3-1)=8在减去获取锁过程的时间,屏蔽集群的时间差异。
5、无法获取N/2+1 个实例或获取锁超时即获取锁失败。
算法安全吗?我们可以尝试了解在不同场景中会发生什么。首先让我们假设客户端能够在大多数情况下获取锁。所有实例都将包含一个具有相同生存时间的密钥。但是,密钥是在不同的时间设置的,因此密钥也会在不同的时间到期。但是如果第一个键在时间 T1(我们在联系第一台服务器之前采样的时间)设置为最差,而最后一个键在时间 T2(我们从最后一个服务器获得回复的时间)设置为最差,我们肯定集合中第一个过期的键至少会存在MIN_VALIDITY=TTL-(T2-T1)-CLOCK_DRIFT。所有其他密钥将在稍后到期,因此我们确信至少这次密钥将同时设置。
在设置了大部分键的期间,另一个客户端将无法获取锁,因为如果 N/2+1 个键已经存在,则 N/2+1 SET NX 操作将无法成功。因此,如果获取了锁,则不可能同时重新获取它(违反互斥属性)。但是,我们还希望确保多个客户端同时尝试获取锁不能同时成功。
如果客户端使用接近或大于锁最大有效时间(我们基本用于 SET 的 TTL)的时间锁定了大多数实例,它会认为锁无效并解锁实例,因此我们只需要考虑客户端能够在小于有效时间的时间内锁定大多数实例的情况。在这种情况下,对于上面已经表达的参数,MIN_VALIDITY没有客户端应该能够重新获取锁。因此,只有当锁定多数的时间大于 TTL 时间时,多个客户端才能同时锁定 N/2+1 个实例(“时间”为第 2 步的结束),从而使锁定无效。
我认为 Redlock 算法是一个糟糕的选择,因为它“既不是鱼也不是家禽”:它对于效率优化锁来说是不必要的重量级和昂贵的,但是对于正确性取决于锁的情况来说它不够安全。
如果您只需要尽力而为的锁定(作为效率优化,而不是为了正确性),我建议坚持使用简单的 Redis单节点锁定算法(条件设置如果不存在以获得锁定, atomic delete-if-value-matches 以释放锁),并在您的代码中非常清楚地记录锁只是近似值,有时可能会失败。不要费心设置五个 Redis 节点的集群。
另一方面,如果您需要锁定以确保正确性,请不要使用 Redlock。相反,请使用适当的共识系统,例如ZooKeeper,可能通过 实现锁的Curator 配方之一。(至少,使用具有合理事务保证的数据库。)并且请在锁定下的所有资源访问中强制使用防护令牌。
正如我开头所说的,Redis 是一个很好的工具,如果你使用得当。以上都不会削弱 Redis 对其预期目的的有用性。Salvatore多年来一直致力于该项目,其成功当之无愧。但是每个工具都有局限性,了解它们并相应地进行计划很重要。
1.单应用-本地部署(I5*8G mac)一套应用jmeter压测这个应用
2.redis master集群-4核心8G*3节点
3.zk集群 - 4核心8G*3节点
锁实现 | 集群方式 | 实现方式 | 获取锁成功率 | TPS | 取样次数 |
mysql | 乐观锁 | 待测 | 待测 | 1-3万次 | |
悲观锁 | 待测 | 待测 | 1-3万次 | ||
zk | 单节点 | curator | 100% | 1066 | 1-3万次 |
集群 | curator | 100% | 50-70 | 1-3万次 | |
redis | cluster | setNx | 100% | 100-120 | 1-3万次 |
lua脚本 | 100% | 200 | 1-3万次 | ||
redisson | 50-100% | 1100 | 1-10万次 | ||
哨兵 | setNx | 待测 | 待测 | 1-3万次 | |
lua脚本 | 待测 | 待测 | 1-3万次 | ||
redisson | 待测 | 待测 | 1-3万次 | ||
单节点 | setNx | 待测 | 待测 | 1-3万次 | |
lua脚本 | 待测 | 待测 | 1-3万次 | ||
redisson | 待测 | 待测 | 1-3万次 |