<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-pool2artifactId>
<version>2.6.0version>
dependency>
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.11.2version>
dependency>
server:
port: 8206
spring:
redis:
host: xxxxx
port: 6379
database: 0
timeout: 1800000
password:
lettuce:
pool:
max-active: 20 #最大连接数
max-wait: -1 #最大阻塞等待时间(负数表示没限制)
max-idle: 5 #最大空闲
min-idle: 0 #最小空闲
/**
* Redis配置类
*/
@Configuration
@EnableCaching
public class RedisConfig {
/**
* 使用默认标签做缓存
* @return
*/
@Bean
public KeyGenerator wiselyKeyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置序列化对象,固定写法
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 将Redis 中 string ,hash 数据类型,自动序列化!
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
说明:通过reids客户端设置 num = 0
set num 0
@RestController
@RequestMapping("test")
public class TestController {
@Autowired
private TestService testService;
@GetMapping("testLock")
public Result testLock() {
testService.testLock();
return Result.ok();
}
}
public interface TestService {
/**
* 测试本地锁
*/
void testLock();
}
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 在缓存中存储一个num,初始值为0
* 利用缓存中的StringRedisTemplate,获取到当前的num数据值
* 如果num不为空,则需要对当前值+1操作
* 如果num为空,返回即可
*/
@Override
public void testLock() {
// 利用缓存中的StringRedisTemplate,获取到当前的num数据值
String num = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(num)) {
return;
}
// 如果num不为空,则需要对当前值+1操作
int numValue = Integer.parseInt(num);
// 写回缓存
redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
}
}
使用 ab 测试工具:httpd-tools(yum install -y httpd-tools)
ab -n(一次发送的请求数) -c(请求的并发数) 访问路径
测试如下:5000请求,100并发
ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock
结果应该为:5000
查看redis中的值:
使用ab工具压力测试:5000次请求,并发100。
查看redis中的结果:
接下来启动 8206 8216 8226 三个运行实例
运行多个service实例:
通过网关压力测试:
ab -n 5000 -c 100 http://127.0.0.1:8206/test/testLock
查看redis中的值:
以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性,此时需要分布式锁。
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
分布式锁主流的实现方案:
每一种分布式锁解决方案都有各自的优缺点:
@Override
public void testLock() {
// 使用setnx命令
// setnx lock ok
Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "OK");
if (flag) {
// flag = true:表示获取到锁
// 执行业务逻辑
String num = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(num)) {
return;
}
int numValue = Integer.parseInt(num);
redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
// 释放锁
redisTemplate.delete("lock");
} else {
// 没有获取到锁
try {
Thread.sleep(100);
// 每隔1秒钟回调一次,再次尝试获取锁(自旋)
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
重启,服务集群,通过网关压力测试。
查看redis中num的值:
问题:setnx 刚好获取到锁,业务逻辑出现异常,导致锁无法释放。
解决:设置过期时间,自动释放锁。
设置过期时间有两种方式:
设置过期时间:
问题:可能会释放其他服务器的锁。
场景:如果业务逻辑的执行时间是7s,执行流程如下:
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。
问题:删除操作缺乏原子性。
场景:
index1 因为已经在方法中了,所以不需要重新上锁,index1有执行的权限。index1已经比较完成了,这个时候,开始执行删除的index2的锁。
@Override
public void testLock() {
// 使用setnx命令
// setnx lock ok
String uuid = UUID.randomUUID().toString();
Boolean flag = redisTemplate.opsForValue().setIfAbsent(
"lock", uuid, 3, TimeUnit.SECONDS);
if (flag) {
// flag = true:表示获取到锁
// 执行业务逻辑
String num = redisTemplate.opsForValue().get("num");
if (StringUtils.isEmpty(num)) {
return;
}
int numValue = Integer.parseInt(num);
redisTemplate.opsForValue().set("num", String.valueOf(++numValue));
// 定义一个lua脚本
String secript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 准备执行lua 脚本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本
redisScript.setScriptText(secript);
// 设置DefaultRedisScript 这个对象的泛型
redisScript.setResultType(Long.class);
// redis调用lua脚本
redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
} else {
// 没有获取到锁
try {
Thread.sleep(100);
// 每隔1秒钟回调一次,再次尝试获取锁(自旋)
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
<dependency>
<groupId>org.redissongroupId>
<artifactId>redissonartifactId>
<version>3.11.2version>
dependency>
配置类
/**
* redisson配置信息
*/
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
private String host;
private String addresses;
private String password;
private String port;
private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize = 10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";
/**
* 自动装配
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
// 判断地址是否为空
if (StringUtils.isEmpty(host)) {
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
// //redis://127.0.0.1:6379
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
// 是否需要密码
if (!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
// RedissonClient redisson = Redisson.create(config);
return Redisson.create(config);
}
}
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public void testLock() {
RLock lock = redissonClient.getLock("lock");
// 开始加锁
lock.lock();
try {
String value = redisTemplate.opsForValue().get("num");
if (StringUtils.isNotEmpty(value)) {
return;
}
int num = Integer.parseInt(value);
redisTemplate.opsForValue().set("num", String.valueOf(++num));
} catch (NumberFormatException e) {
e.printStackTrace();
} finally {
// 解锁:
lock.unlock();
}
}
}
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间,超过这个时间后锁便自动解开了。
最常见的使用:
RLock lock = redisson.getLock("anyLock");
// 最常使用
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
代码实现
@GetMapping("read")
public Result<String> read(){
String msg = testService.readLock();
return Result.ok(msg);
}
@GetMapping("write")
public Result<String> write(){
String msg = testService.writeLock();
return Result.ok(msg);
}
public interface TestService {
String readLock();
String writeLock();
}
@Service
public class TestServiceImpl implements TestService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public String readLock() {
// 初始化读写锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
// 获取读锁
RLock rLock = readWriteLock.readLock();
// 加10s锁
rLock.lock(10, TimeUnit.SECONDS);
String msg = this.redisTemplate.opsForValue().get("msg");
//rLock.unlock(); // 解锁
return msg;
}
@Override
public String writeLock() {
// 初始化读写锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readwriteLock");
// 获取写锁
RLock rLock = readWriteLock.writeLock();
// 加10s锁
rLock.lock(10, TimeUnit.SECONDS);
this.redisTemplate.opsForValue().set("msg", UUID.randomUUID().toString());
//rLock.unlock(); // 解锁
return "成功写入了内容";
}
}
打开两个浏览器窗口测试:
http://localhost:8206/test/read
http://localhost:8206/test/write