基于redisson3.5.4
一个基于Java实现,提供操作Redis的客户端,其他客户端:https://redis.io/docs/clients/
Jedis vs redisson
Jedis:
- redis基础操作(Map、Set、List、Queue、Deque、ScoredSortedSet、Publish/Subscribe、 BitSet…)
redisson:
- redis基础操作、PriorityQueue、 DelayedQueue、BloomFilter、RateLimite…增加分布式锁和同步器
- 基于JDK提供的Lock、Semaphore、 CountDownLatch、FairLock、MultiLockReadWriteLock实现多种锁
5种配置模式
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() throws IOException {
// useSingleServer – for single node instance
// useMasterSlaveServers – for master with slave nodes
// useSentinelServers – for sentinel nodes.
// useClusterServers – for clustered nodes.
// useReplicatedServers – for replicated nodes.
// addNodeAddress(xxx).addNodeAddress(xxx)
Config config = new Config();
config.setCodec(new StringCodec());
// 30 * 1000
// config.setLockWatchdogTimeout();
config.useSingleServer()
.setAddress("")
.setPassword("")
.setConnectionPoolSize(64)
.setIdleConnectionTimeout(10000)
.setConnectTimeout(3000)
.setTimeout(3000)
.setPingTimeout(30000)
.setReconnectionTimeout(3000)
.setDatabase(5);
return Redisson.create(config);
}
}
@Test
public void operationTest() {
// 字符串操作
RBucket<Object> rBucket = redissonClient.getBucket("easicare:redisson:str");
rBucket.set("120",5000, TimeUnit.SECONDS);
// map操作
RMap<String, String> map = redissonClient.getMap("easicare:redisson:map");
map.put("id1", "119");
// set操作
RSet<String> rSet = redissonClient.getSet("easicare:redisson:set");
rSet.add("idx2");
rSet.readAll();
// list操作
RList<String> rList = redissonClient.getList("easicare:redisson:list");
rList.add("idx4");
System.out.println(rList.readAll());
// 队列操作
RQueue<String> rQueue = redissonClient.getQueue("easicare:redisson:queue");
rQueue.add("idx5");
System.out.println(rQueue.readAll());
}
谈到Redisson总会提到看门口,这里也是说说自己的想法
场景:
用redis setnx实现分布式锁,会有哪些问题:
redisson的 watch dog:
从 lockInterruptibly 方法入手
再看看 tryAcquire 方法
注意:指定锁定时间时,watch dog机制失效
源码
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(final long threadId) {
// 如果续期任务里已经存在, 直接返回
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
// 开启定时任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// lua 脚本执行续期, 返回结果true or false
RFuture<Boolean> future = commandExecutor.evalWriteAsync
(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
// 先移除任务, 避免读到上次留下的任务(脏数据)
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
// 续期成功, 执行下次任务
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 续期结果为false且任务列表里没有了,取消此次定时任务
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}