1.在redis中的命令语句中,命令是忽略大小写的,而key是不忽略大小写的。
string 类型 set key value
get key
hash类型 hset
set 类型 sadd
zset 是排序set集合
高并发情况下 setnx
1.场景: 在并发情况下 可能有线程1 把线程2的锁进行释放
解决: 每个线程 在lockkey 的value 设置自己的单独的value 和设置过期时间
2.在时间过期后 任务还未执行完 如何处理
luan 脚本 eval
加锁 解锁
重入锁的逻辑
redisson 是非公平锁
线程1抢到锁之后 会有监听器来监听执行 timetask 执行续命
线程2 会间隙性去判断是是否可以抢锁
双写 不一致
mvcc
分段锁
读写锁
eq:
@RequestMapping("/testStock")
public String testStock() {
String lockKey = "lock:product_101";
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock(); // .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
try {
// 业务逻辑
} finally {
redissonLock.unlock();
}
return "end";
}
// name 是 lockkey
public RLock getLock(String name) {
// 构造一个 RedissonLock 对象 internalLockLeaseTime 为锁租用时间 30s
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
// todo 为啥会有打断标志
Thread.currentThread().interrupt();
}
}
// 第一步 : leaseTime -1 unit null
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 这个方法 是 线程去持有锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired 获得的
if (ttl == null) {
return;
}
// 线程2走到这里来了 redisson_lock__channel subscribe 订阅
// 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
// 这个方法是 客户端2 等待ttl 秒 再去看锁是否被持有 todo future的用法
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
// 第一步 : leaseTime -1 unit null
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
// 由于第一次进来 所以不走这个方法
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// getLockWatchdogTimeout 默认时间为30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 持有锁的 线程 加了一个Listener
// 我的理解是Future异步任务 异步任务完成的时候 会调用Listener回调方法
// future.isSuccess() 任务完成 future.getNow() 返回值(callback)
// scheduleExpirationRenewal(threadId); 这个是主要的看门狗方法
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// leaseTime 为 30s unit 为 秒 单位 command为eval
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// KEYS[1] 为 lockkeyName ARGV[1] 为 id:线程id ARGV[2]: 持有时间 默认30s
// exists 判断key是否存在 不存在 设置值 和 时间
// hexists 判断key val 是否一致 一致的话 当前线程 值 加 1
// pttl key 以毫秒为单位返回 key 的剩余生存时间 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。
// 否则,以毫秒为单位,返回 key 的剩余生存时间。
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
第一个请求去加锁过程中 会持有一个Listener 这个 是 看门狗的实现原理
只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
// expirationRenewalMap 到期续约map 这个是ConcurrentMap key id:name Timeout 时间
/*
* hexists 查看哈希表 key 中,给定域 field 是否存在
* pexpire 续时 30s
* TimerTask 定时任务 每隔10s运行一次
*
*/
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
// expirationRenewalMap 直接 移出 key
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// putIfAbsent 如何key值不存在,则插入map,返回Null。如果key值存在,不替换旧值,且返回旧的value。
// 所以下面的情况是 首次进来没有key 所以 map set key
// 如果 value发生了变化 定时任务取消
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
// Future future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
// 这里是把锁给删除
// 重入锁有减1
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
void cancelExpirationRenewal() {
// expirationRenewalMap 移出key 定时任务取消
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
``
### redis 的 stream
1.生产者
redis 的多线程
redis为什么这么快 1.内存 2.单线程 3.非阻塞式io 4.rest