乐观锁基于CAS(Compare And Swap)思想(比较并替换),是不具有互斥性,不会产生锁等待而消耗资源,但是需要反复的重试,但也是因为重试的机制,能比较快的响应。因此我们可以利用redis来实现乐观锁。具体思路如下:
Redis乐观锁实现秒杀
- public class Second {
- public static void main(String[] arg) {
- String redisKey = "lock";
- ExecutorService executorService = Executors.newFixedThreadPool(20);
- try {
- Jedis jedis = new Jedis("127.0.0.1", 6378);
- // 初始值
- jedis.set(redisKey, "0");
- jedis.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- for (int i = 0; i < 1000; i++) {
- executorService.execute(() -> {
- Jedis jedis1 = new Jedis("127.0.0.1", 6378);
- try {
- jedis1.watch(redisKey);
- String redisValue = jedis1.get(redisKey);
- int valInteger = Integer.valueOf(redisValue);
- String userInfo = UUID.randomUUID().toString();
- // 没有秒完
- if (valInteger < 20) {
- Transaction tx = jedis1.multi();
- tx.incr(redisKey);
- List list = tx.exec();
- // 秒成功 失败返回空list而不是空
- if (list != null && list.size() > 0) {
- System.out.println("用户:" + userInfo + ",秒杀成功!当前成功人数:" + (valInteger + 1));
- }
- // 版本变化,被别人抢了。
- else {
- System.out.println("用户:" + userInfo + ",秒杀失败");
- }
- }
- // 秒完了
- else {
- System.out.println("已经有20人秒杀成功,秒杀结束");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- jedis1.close();
- }
- });
- }
- executorService.shutdown();
- }
- }
实现原理
单应用中使用锁:(单进程多线程):synchronized、ReentrantLock
分布式应用中使用锁:(多进程多线程):
获取锁:
方式1(使用set命令实现)--推荐
- /**
- * 使用redis的set命令实现获取分布式锁
- *
- * @param lockKey 可以就是锁
- * @param requestId 请求ID,保证同一性 uuid+threadID
- * @param expireTime 过期时间,避免死锁
- * @return
- */
- public boolean getLock(String lockKey,String requestId,int expireTime){
- //NX:保证互斥性
- // hset 原子性操作 只要lockKey有效 则说明有进程在使用分布式锁
- String result=jedis.set(lockKey,requestId,"NX","EX",expireTime);
- if("OK".equals(result)){
- return true;
- }
- return false;
- }
方式2(使用setnx命令实现) -- 并发会产生问题
- public boolean getLock(String lockKey,String requestId,int expireTime){
- Long result=jedis.setnx(lockKey,requestId);
- if(result==1){
- //成功设置 进程down 永久有效 别的进程就无法获得锁
- jedis.expire(lockKey,expireTime);
- return true;
- }
- return false;
- }
释放锁
方式1(del命令实现) -- 并发
- /**
- * 释放分布式锁
- * @param lockKey
- * @param requestId
- */
- public static void releaseLock(String lockKey,String requestId) {
- if (requestId.equals(jedis.get(lockKey))) {
- jedis.del(lockKey);
- }
- }
问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。
方式2(redis+lua脚本实现)--推荐
- public static boolean releaseLock(String lockKey, String requestId) {
- String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
- if (result.equals(1L)) {
- return true;
- }
- return false;
- }
存在问题

本质分析:
CAP模型分析:
为什么还可以用Redis实现分布式锁?
- <dependency>
- <groupId>org.redissongroupId>
- <artifactId>redissonartifactId>
- <version>2.7.0version>
- dependency>
- public class RedissonManager {
- private static Config config = new Config();
- //声明redisso对象
- private static Redisson redisson = null;
-
- //实例化redisson
- static {
- config.useClusterServers()
- // 集群状态扫描间隔时间,单位是毫秒
- .setScanInterval(2000)
- //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
- .addNodeAddress("redis://127.0.0.1:6379")
- .addNodeAddress("redis://127.0.0.1:6380")
- .addNodeAddress("redis://127.0.0.1:6381")
- .addNodeAddress("redis://127.0.0.1:6382")
- .addNodeAddress("redis://127.0.0.1:6383")
- .addNodeAddress("redis://127.0.0.1:6384");
- //得到redisson对象
- redisson = (Redisson) Redisson.create(config);
- }
-
- //获取redisson对象的方法
- public static Redisson getRedisson() {
- return redisson;
- }
- }
- public class DistributedRedisLock {
- //从配置类中获取redisson对象
- private static Redisson redisson = RedissonManager.getRedisson();
- private static final String LOCK_TITLE = "redisLock_";
-
- //加锁
- public static boolean acquire(String lockName) {
- //声明key对象
- String key = LOCK_TITLE + lockName;
- //获取锁对象
- RLock mylock = redisson.getLock(key);
- //加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
- mylock.lock(2, 3, TimeUtil.SECOND);
- //加锁成功
- return true;
- }
-
- //锁的释放
- public static void release(String lockName) {
- //必须是和加锁时的同一个key
- String key = LOCK_TITLE + lockName;
- //获取所对象
- RLock mylock = redisson.getLock(key);
- //释放锁(解锁)
- mylock.unlock();
- }
- }
- public String discount() throws IOException{
- String key = "lock001";
- //加锁
- DistributedRedisLock.acquire(key);
- //执行具体业务逻辑
- dosoming
- //释放锁
- DistributedRedisLock.release(key);
- //返回结果
- return soming;
- }

加锁机制
- "if (redis.call('exists',KEYS[1])==0) then "+ --看有没有锁
- "redis.call('hset',KEYS[1],ARGV[2],1) ; "+ --无锁 加锁
- "redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
- "return nil; end ;" +
- "if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+ --我加的锁
- "redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+ --重入锁
- "redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
- "return nil; end ;" +
- "return redis.call('pttl',KEYS[1]) ;" --不能加锁,返回锁的时间
lua的作用:保证这段复杂业务逻辑执行的原子性。
lua的解释:
第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。如何加锁呢?很简单,用下面的命令:
hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
myLock :{"8743c9c0-0795-4907-87fd-6c719a6b4586:1":1 }
上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。
接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。
锁互斥机制
那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?
自动延时机制
只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
可重入锁机制
incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
myLock :{"8743c9c0-0795-4907-87fd-6c719a6b4586:1":2 }
释放锁机制
执行lua脚本如下:
- #如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "return 1; " +
- "end;" +
- # key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。 不是我加的锁 不能解锁
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
- "return nil;" +
- "end; " +
- # 将value减1
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- # 如果counter>0说明锁在重入,不能删除key
- "if (counter > 0) then " +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "return 0; " +
- # 删除key并且publish 解锁消息
- "else " +
- "redis.call('del', KEYS[1]); " + #删除锁
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "return 1; "+
- "end; " +
- "return nil;",
- 如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
- 其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
- 如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
- “del myLock”命令,从redis里删除这个key。
- 然后呢,另外的客户端2就可以尝试完成加锁了。
数据并发竞争:利用分布式锁可以将处理串行化,前面已经讲过了。
防止库存超卖

可以采用分布式锁解决这个问题。

订单1和订单2都从Redis中获得分布式锁(setnx),谁能获得锁谁进行下单操作,这样就把订单系统下单的顺序串行化了,就不会出现超卖的情况了。伪码如下:
- //加锁并设置有效期
- if(redis.lock("RDL",200)){
- //判断库存
- if (orderNum
- //加锁成功 ,可以下单
- order(5);
- //释放锁
- redis,unlock("RDL");
- }
- }
注意此种方法会降低处理效率,这样不适合秒杀的场景,秒杀可以使用CAS和Redis队列的方式。
6、Zookeeper分布式锁的对比
- 基于Redis的set实现分布式锁
- 基于zookeeper临时节点的分布式锁

- 基于etcd实现
三者的对比,如下表
Redis zookeeper etcd 一致性算法 无 paxos(ZAB) raft CAP AP CP CP 高可用 主从集群 n+1 (n至少为2) n+1 接口类型 客户端 客户端 http/grpc 实现 setNX createEphemeral restful API