(1)什么是缓存穿透
缓存穿透是指缓存和数据库中的没有的数据,而用户不断发起请求,由于缓存时不命中时被动写入的,处于容错考虑,如果从缓存查不到的数据则不写入缓存,这将会导致不存再的数据每次请求都要到存储层(DAO和SQL)去查询,这样缓存就是去意义了。再流量大的时候,SQL数据库的压力就太大了。如果有人利用不存再的key频繁攻击我们的应用,这就是性能漏洞。
例如发起id为”-1“的数据请求,这些数据时不存在的,用户如果时故意为之,我们的系统就会遭受攻击。
(2)解决方法
接口层增加校验,例如:用户权限校验,id做范围校验,id<=0的请求直接拦截。
从缓存取不到数据时,如果在数据库也没有渠道,这时可以将key-value对写为key-null,缓存有效时间可以设置短一些,如30秒(缓存时间太长会导致正常情况也无法使用),这也是可以方式黑客利用缓存穿透进行攻击的有效手段。
一个存在value的key,在缓存创建(或过期)的一刻,同时有大量的请求,这些请求都会击穿到DB,造成瞬时DB请求量大,压力骤增。缓存击穿只会发生在高并发的时候,就是当有10000 个并发进行查询数据的时候,我们一般都会先去redis里面查询进行数据,但是如果redis里面没有这个数据的时候,那么这10000个并发里面就会有很大一部分并发会一下子都去mysql数据库里面进行查询了。这样给数据库带来巨大的压力。
例如以下示例:
(1) 业务类代码
@Override
public Category queryById(int id) {
objRedisTemplate.setKeySerializer(RedisSerializer.string());
Category category = (Category) objRedisTemplate.opsForValue().get("category-"+id);
if(category==null) {
//双重检查没有才去SQL数据库查
System.out.println("从SQL数据库读取...");
category = categoryMapper.selectById(id);
objRedisTemplate.opsForValue().set("category-"+id, category, 5, TimeUnit.MINUTES);
}else{
System.out.println("从Redis缓存读取...");
}
return category;
}
(2)控制器代码模拟同时1000个并发请求,导致缓存击穿
//模拟缓存击穿
@RequestMapping("/breakdown")
@ResponseBody
public String breakdown(Integer id){
//初始化线程池执行器
ExecutorService executorService = Executors.newFixedThreadPool(10);
//利用线程池模拟并发读取1000次
for(int i=0; i<1000; i++){
executorService.submit(new Runnable() {
@Override
public void run() {
Category category = categoryService.queryById(id);
}
});
}
executorService.shutdown();
return "success";
}
印结果打印了很多查询数据库和查询缓存,此时也就说明10000个并发里面有很多去查询了数据库,这就是高并发引起的问题。也就是缓存击穿。我们怎么解决缓存击穿呢,即使10000个并发过来,然后这10000个并发需要的数据在redis里面都没有,那么我们应该第一个线程查询数据里面的数据,然后把这个数据给放到redis里面,然后剩下的9999个线程都到redis里面查询,这样就解决了缓存击穿。
使用锁来解决缓存击穿问题,而且叫做双重检测锁,为什么叫做双重检测锁呢,因为有两个if语句,第一个if语句就是为了减少走同步代码块,因为如果换成里面存在想要的数据,就直接获取,所以有两个if语句。第一次线程查询把数据放到redis缓存之后,剩下的线程当走到下面的同步代码块的时候,需要在查询一下缓存里面的数据就会发现刚刚第一个线程放到redis里面的数据了。
@Override
public Category queryById(int id) {
redisTemplate.setKeySerializer(RedisSerializer.string());
//使用双重检查,防止缓存被击穿
//第一次检查
Category category = (Category) redisTemplate.opsForValue().get("category-"+id);
if(category==null) {
//加同步锁
synchronized (this){
//加锁后再次检查
category = (Category) redisTemplate.opsForValue().get("category-"+id);
if(category==null) {
//双重检查没有才去SQL数据库查
System.out.println("从SQL数据库读取...");
category = categoryMapper.selectById(id);
redisTemplate.opsForValue().set("category-"+id, category, 5, TimeUnit.MINUTES);
}
}
}else{
System.out.println("从Redis缓存读取...");
}
return category;
}
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
@Override
public Category queryOneWithString(int id) {
redisTemplate.setKeySerializer(RedisSerializer.string());
Category category = (Category) redisTemplate.opsForValue().get("category");
/*解决缓存穿透问题,存 null 改变 判断方式*/
if (redisTemplate.hasKey("category")){ //第一次检查 第一步解决缓存穿透
/*1 判断 redis中 是否存在数据*/
System.out.println("从 redis中获取数据");
return category;
}else {
/*解决 缓存 击穿*/
synchronized (this){
/*再次 查询 是否 存在数据*/
if (!redisTemplate.hasKey("category")){ 第二次检查 第二步 解决缓存击穿 设置双重锁
System.out.println("从mysql中获取数据");
category = categoryDao.selectById(id);
redisTemplate.opsForValue().set("category",category);
if (category == null){
redisTemplate.expire("category",30,TimeUnit.SECONDS);//设置空白缓存过期30秒
}
/*解决缓存雪崩: 设置不同的过期时间 随机*/ //第三步 解决 缓存雪崩 防止大量缓存过期
Random random = new Random();
int suiji = random.nextInt(10);
System.out.println("随机数"+suiji );
redisTemplate.expire("category",5+suiji,TimeUnit.MINUTES);
}else {
/*再从redis中获取*/
category = (Category) redisTemplate.opsForValue().get("category");
}
}
return category;
}
}
E:\09.redis\redis-samplate\001-springboot-redis
import com.powernode.dao.CategoryDao;
import com.powernode.model.Category;
import com.powernode.service.CategoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/*
* service:业务逻辑处理,从数据源获取数据,现在有两个数据源
* mysql:持久化,性能差
* redis:不擅长持久化,性能极高
*结合两种数据库的优势
* */
@Service
public class CategoryServiceImpl implements CategoryService {
@Autowired
private CategoryDao categoryDao;
@Autowired
private RedisTemplate redisTemplate;
/*
redisTemplate.opsForValue();//操作字符串
redisTemplate.opsForHash();//操作hash
redisTemplate.opsForList();//操作list
redisTemplate.opsForSet();//操作set
redisTemplate.opsForZSet();//操作有序set
RedisTemplate 和 stringredistemplate
如果你想使用默认的配置来操作redis,则如果操作的数据是字节数组,
就是用redistemplate,如果操作的数据是明文,使用stringredistemplate。
opsForHash() 和 boundHashOps
获取a,然后获取b,然后删除c,对同一个key有多次操作,按照opsForHash()的写法
每次都是redisTemplate.opsForHash().xxx("key","value")写法很是啰嗦
int result = (Integer) redisTemplate.opsForHash().get("hash-key","a");
result = (Integer)redisTemplate.opsForHash().get("hash-key","b");
redisTemplate.opsForHash().delete("hash-key","c");
* boundHashOps()则是直接将key和boundHashOperations对象进行了绑定,
* 后续直接通过boundHashOperations对象进行相关操作即可,写法简洁,不需要
* 每次都显式的将key写出来
BoundHashOperations boundHashOperations = redisTemplate.boundHashOps("hash-key");
*/
@Override
public List<Category> queryAll() {
redisTemplate.setKeySerializer(RedisSerializer.string());
/*1 判断 redis中 是否存在数据*/
List<Category> categorys = (List<Category>) redisTemplate.opsForValue().get("categorys");
/*2存在 则 从redis中 获取数据 返回*/
if (categorys != null){
System.out.println("从 redis中获取数据");
return categorys;
}else {
System.out.println("从mysql中获取数据");
/*3 redis中不存在,1 从mysql中获取数据 2 将数据 存入 redis 用于下次查询时 从redis中获取*/
List<Category> categories = categoryDao.selectAll();
redisTemplate.opsForValue().set("categorys",categories,5, TimeUnit.MINUTES);
return categories;
}
}
@Override
public List<Category> queryAllByHash() {
redisTemplate.setKeySerializer(RedisSerializer.string());
/*1 判断 redis中 是否存在数据*/
List<Category> categorys = (List<Category>) redisTemplate.opsForHash().values("hash:category");
System.out.println(categorys.size());
/*2存在 则 从redis中 获取数据 返回*/
if (categorys.size()>0){
System.out.println("从 redis中获取数据");
return categorys;
}else {
System.out.println("从mysql中获取数据");
categorys = categoryDao.selectAll();
categorys.forEach(x->{
redisTemplate.opsForHash().put("hash:category",x.getId(),x);
});
redisTemplate.expire("hash:category",5,TimeUnit.MINUTES);
return categorys;
}
}
@Override
public Category queryOne(int id) {
redisTemplate.setKeySerializer(RedisSerializer.string());
/*1 判断 redis中 是否存在数据*/
Category category = (Category) redisTemplate.opsForHash().get("hash:category", id);
/*2存在 则 从redis中 获取数据 返回*/
if (category != null){
System.out.println("从 redis中获取数据");
return category;
}else {
System.out.println("从mysql中获取数据");
category = categoryDao.selectById(id);
redisTemplate.opsForHash().put("hash:category",category.getId(),category);
redisTemplate.expire("hash:category",5,TimeUnit.MINUTES);
return category;
}
}
@Override
public Category queryOneWithString(int id) {
redisTemplate.setKeySerializer(RedisSerializer.string());
Category category = (Category) redisTemplate.opsForValue().get("category");
/*解决缓存穿透问题,存 null 改变 判断方式*/
if (redisTemplate.hasKey("category")){ //第一次检查 第一步解决缓存穿透
/*1 判断 redis中 是否存在数据*/
System.out.println("从 redis中获取数据");
return category;
}else {
/*解决 缓存 击穿*/
synchronized (this){
/*再次 查询 是否 存在数据*/
if (!redisTemplate.hasKey("category")){ 第二次检查 第二步 解决缓存击穿 设置双重锁
System.out.println("从mysql中获取数据");
category = categoryDao.selectById(id);
redisTemplate.opsForValue().set("category",category);
if (category == null){
redisTemplate.expire("category",30,TimeUnit.SECONDS);//设置空白缓存过期30秒
}
/*解决缓存雪崩: 设置不同的过期时间 随机*/ //第三步 解决 缓存雪崩 防止大量缓存过期
Random random = new Random();
int suiji = random.nextInt(10);
System.out.println("随机数"+suiji );
redisTemplate.expire("category",5+suiji,TimeUnit.MINUTES);
}else {
/*再从redis中获取*/
category = (Category) redisTemplate.opsForValue().get("category");
}
}
return category;
}
}
/*增删改 删掉redis数据 实现 数据同步-------------高并发redis双删 1先删redis 2再删mysql 3 延迟删除 redis*/
}