目录
第一步,定义拦截器
- package com.hmdp.utils;
-
- import com.hmdp.dto.UserDTO;
- import com.hmdp.entity.User;
- import org.springframework.web.servlet.HandlerInterceptor;
-
- import javax.servlet.http.HttpServletRequest;
- import javax.servlet.http.HttpServletResponse;
- import javax.servlet.http.HttpSession;
-
- public class LoginInterceptor implements HandlerInterceptor {
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- //1、获取session
- HttpSession session = request.getSession();
- //2、获取session中的用户
- Object user = session.getAttribute("user");
- //3、判断用户是否存在
- if (user == null) {
- //4、不存在,拦截,返回401状态码
- response.setStatus(401);
- return false;
- }
- //5、存在,保存用户信息到ThreadLocal
- UserHolder.saveUser((UserDTO) user);
- //6、放行
- return true;
- }
-
- @Override
- public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
- //移除用户,避免内存泄漏
- UserHolder.removeUser();
- }
- }
userhold类下
- package com.hmdp.utils;
-
- import com.hmdp.dto.UserDTO;
-
- public class UserHolder {
- private static final ThreadLocal
tl = new ThreadLocal<>(); -
- public static void saveUser(UserDTO user){
- tl.set(user);
- }
-
- public static UserDTO getUser(){
- return tl.get();
- }
-
- public static void removeUser(){
- tl.remove();
- }
- }
第二步:配置文件(让拦截器生效)
- package com.hmdp.config;
-
- import com.hmdp.utils.LoginInterceptor;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
- import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
-
- @Configuration
- public class MvcConfig implements WebMvcConfigurer {
- //配置,添加拦截器,让之前的拦截器生效
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- registry.addInterceptor(new LoginInterceptor())
- //排除掉不需要拦截的路径
- .excludePathPatterns(
- "/shop/**",
- "/voucher/**",
- "/shop-type/**",
- "/upload/**",
- "/blog/hot",
- "/usr/code",
- "/usr/login"
- );
- }
- }
session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务是导致数据丢失问题
session的代替为redis,满足数据共享,内存存储key、value结构

保存登录的用户信息,可以使用String结构,以JSON字符串来保存,比较直观:

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少:



自己创建的类,无法直接用@Autowire方式注入,因为他不属于 spring容器管理的。
需要在创建的加入一个构造方法,然后在由其他由spring管理的类调用,然后在注入传入这个属性即可
- @Configuration
- public class MvcConfig implements WebMvcConfigurer {
- //由spring管理的类注册
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- //配置,添加拦截器,让之前的拦截器生效
- @Override
- public void addInterceptors(InterceptorRegistry registry) {
- //调用时传入这个即可
- registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
-
- }
- public class LoginInterceptor implements HandlerInterceptor {
-
- private StringRedisTemplate stringRedisTemplate;
-
- public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
- this.stringRedisTemplate = stringRedisTemplate;
- }
- }
缓存就是数据交换的缓冲区(称作Cache,是存储数据的临时笛梵,一般读写性能较高)


- //1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
- String shopJson = stringRedisTemplate.opsForValue().get(key);
-
- //2、判断是否存在
- if (StrUtil.isNotBlank(shopJson)){
- //3、存在,直接返回缓存中的
- //是json数据,则需要通过JSONUtil返回指定的对象即可
- Shop shop= JSONUtil.toBean(shopJson, Shop.class);
- return Result.ok(shop);
- }
- //4、缓存中不存在,根据id查询数据库
- Shop shop = this.getById(id);
- //5、数据库中不存在,返回错误
- if (shop==null){
- return Result.fail("商铺不存在");
- }
- //6、存在,写入缓存
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
- //7、返回
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- @Override
- public Result shopTypeList() {
- String key="shop_Type_List";
- //查询缓存
- String shopTypeJson = stringRedisTemplate.opsForValue().get(key);
-
- if (StrUtil.isNotBlank(shopTypeJson)){
- //查到了,直接返回,json转list集合
- List
shopTypes = JSONUtil.toList(shopTypeJson, ShopType.class); - return Result.ok(shopTypes);
- }
- //缓存没查到,查数据库
- List
typeList = this.query().orderByAsc("sort").list(); - //数据库没查到,返回错误
- if (typeList==null){
- return Result.fail("没有列表信息");
- }
- //数据库查到,缓存下
- //list集合转json
- String json = JSONUtil.toJsonStr(typeList);
- stringRedisTemplate.opsForValue().set(key, json);

业务场景
低一致性需求:使用内存淘汰机制,例如店铺类型的查询
高一致性需求;主动更新,并以超时剔除作为兜底方案,例如店铺的详细信息

操作缓存和数据库的三个问题需要考虑:
1,删除缓存还是更新缓存?
2、如何保证缓存与数据库的操作同时成功或者失败?
3、先操作缓存还是先操作数据库

缓存更新策略的最佳实践方案:
1、低一致性需求:使用Redis自带的内存淘汰机制
2、高一致性需求:主动更新,并以超时提出作为兜底方案
读操作:
写操作::
我们使用Redis大部分情况都是通过Key查询对应的值,假如发送的请求传进来的key是不存在Redis中的,那么就查不到缓存,查不到缓存就会去数据库查询。假如有大量这样的请求,这些请求像“穿透”了缓存一样直接打在数据库上,这种现象就叫做缓存穿透。
解决方案:
1、缓存空对象(把无效的Key存进Redis中)。如果Redis查不到数据,数据库也查不到,我们把这个Key值保存进Redis,设置value="null",当下次再通过这个Key查询时就不需要再查询数据库。这种处理方式肯定是有问题的,假如传进来的这个不存在的Key值每次都是随机的,那存进Redis也没有意义,会占用redis的内存空间,所以设置过期时间是有必要的,其次,当这个值一开始没有内容,我们查询数据库后 ,将null赋值给这个值,并存在redis中,而之后我们数据库新增了这个值,但是缓存中还是为null,这就会导致短期数据不一致,可以使用更新数据库删除那个缓存就可以解决。
2、使用布隆过滤器。布隆过滤器的作用是某个 key 不存在,那么就一定不存在,它说某个 key 存在,那么很大可能是存在(存在一定的误判率)。于是我们可以在缓存之前再加一层布隆过滤器,在查询的时候先去布隆过滤器查询 key 是否存在,如果不存在就直接返回,这个布隆过滤器也存在一定的穿透风险。

项目中解决缓存穿透的思路

null表示的是一个对象的值,而非一个字符串。例如声明一个对象的引用,String aaa = null ;
""表示的是一个长度为0的空字符串。例如声明一个字符串String bbb = "" ;
所以:null不指向任何对象,相当于没有任何值;而""代表一个长度为0的字符串。
- public Result queryById(Long id) {
- String key=CACHE_SHOP_KEY+id;
- //1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
- String shopJson = stringRedisTemplate.opsForValue().get(key);
-
- //2、判断是否存在
- if (StrUtil.isNotBlank(shopJson)){
- //3、存在,直接返回缓存中的
- //是json数据,则需要通过JSONUtil返回指定的对象即可
- Shop shop= JSONUtil.toBean(shopJson, Shop.class);
- return Result.ok(shop);
- }
- //4、缓存中不存在,判断是否命中的为空串""
- if (shopJson!=null){
- return Result.fail("商铺不存在");
- }
- Shop shop = this.getById(id);
- //5、数据库中不存在,并返回错误
- if (shop==null){
- //插入一个空串,设置过期时间
- stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
- return Result.fail("商铺不存在");
- }
- //6、存在,写入缓存
- stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
- //7、返回
- return Result.ok(shop);
- }
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求给数据库带来巨大压力
缓存穿透的解决方案有?
当某一个时刻出现大规模的redis缓存失效的情况,就会导致大量的请求直接打在数据库上面,导致数据库压力巨大,如果在高并发的情况下,可能瞬间就会导致数据库宕机。这时候如果运维马上又重启数据库,马上又会有新的流量把数据库打死。这就是缓存雪崩。缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
1、在原有的失效时间上加上一个随机值,这样就避免了因为采用相同的过期时间导致的缓存雪崩。
如果真的发生了缓存雪崩,有没有什么兜底的措施?
2、使用熔断机制。当流量到达一定的阈值时,就直接返回“系统拥挤”之类的提示,防止过多的请求打在数据库上。至少能保证一部分用户是可以正常使用,其他用户多刷新几次也能得到结果。
3、提高数据库的容灾能力,可以使用分库分表,读写分离的策略。
4、为了防止Redis宕机导致缓存雪崩的问题,可以搭建Redis集群,提高Redis的容灾性

其实跟缓存雪崩有点类似,缓存雪崩是大规模的key失效,而缓存击穿是一个热点的Key,有大并发集中对其进行访问,突然间这个Key失效了,导致大并发全部打在数据库上,导致数据库压力剧增。这种现象就叫做缓存击穿。
解决方案:
1、业务允许的话,对于热点的key可以设置永不过期的key。
2、使用互斥锁。如果缓存失效的情况,只有拿到锁才可以查询数据库,降低了在同一时刻打在数据库上的请求,防止数据库打死。当然这样会导致系统的性能变差。
多条线程同时访问数据库



修改id查询店铺 ,基于互斥锁来解决缓存击穿问题

- /**
- * 互斥锁解决缓存击穿
- * @param id
- * @return
- */
- public Shop queryWithMutex(Long id){
- String key=CACHE_SHOP_KEY+id;
- //1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
- String shopJson = stringRedisTemplate.opsForValue().get(key);
-
- //2、判断是否存在
- if (StrUtil.isNotBlank(shopJson)){
- //3、存在,直接返回缓存中的
- //是json数据,则需要通过JSONUtil返回指定的对象即可
- Shop shop= JSONUtil.toBean(shopJson, Shop.class);
- return shop;
- }
- //4、缓存中不存在,判断是否命中的为空串""
- if (shopJson!=null){
- return null;
- }
-
- //4实现缓存重建
- //4.1获取互斥锁
- String lockKey=LOCK_SHOP_KEY+id;
- Shop shop = null;
- try {
- boolean isLock = tryLock(lockKey);
- //4.2判断是否获取成功
- if (!isLock){
- //4.3失败,则休眠并重试
- Thread.sleep(99);
- return queryWithMutex(id);
- }
- //4.4成功,根据id查询数据库
- shop = this.getById(id);
- //模拟重建延时
- Thread.sleep(366);
- //5、数据库中不存在,并返回错误
- if (shop==null){
- //插入一个空串,设置过期时间
- stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
- return null;
- }
- //6、存在,写入缓存
- stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- //7、释放互斥锁
- unLock(lockKey);
- }
- //8、返回
- return shop;
- }
基于逻辑过期解决缓存击穿问题

- /**
- * 逻辑辑过期解决缓存击穿
- * @param id
- * @return
- */
- //创建一个线程池
- private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
-
- public Shop queryWithLogicalExpire(Long id){
- String key=CACHE_SHOP_KEY+id;
- //1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
- String shopJson = stringRedisTemplate.opsForValue().get(key);
-
- //2、判断是否存在
- if (StrUtil.isBlank(shopJson)){
- //3不存在,直接返回null
- return null;
- }
- //4命中,需要把json反序列化为对象
- RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
- JSONObject data = (JSONObject) redisData.getData();
- Shop shop = JSONUtil.toBean(data, Shop.class);
- LocalDateTime expireTime = redisData.getExpireTime();
- //5、判断是否过期,是否在当前时间之后
- if (expireTime.isAfter(LocalDateTime.now())){
- //5。1没过期。直接返回店铺信息
- return shop;
- }
- //5.2已过期,需要缓存
- //6、缓存重建
- //6/1获取互斥锁
- String lockKey= LOCK_SHOP_KEY+id;
- boolean isLock = tryLock(lockKey);
- //6.2判断是否获取锁成功
- if (isLock){
- //6.3成功,开启独立线程,实现缓存重建
- CACHE_REBUILD_EXECUTOR.submit(()->{
- try {
- //重建缓存
- this.saveShop2Redis(id,20L);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- //释放锁
- unLock(lockKey);
- }
-
- });
- }
-
- return shop;
- }

修改数据据库中的一点信息,会发现某一时刻重建前是旧数据,完成后是新数据

控制台中只有一数据重建,一次是查询旧数据,一次为新数据重建

基于stringRedisTemplate封装一个缓存工具类,满足下列需求:
方法1:将任意va对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意/ava对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
- package com.hmdp.utils;
-
- import cn.hutool.core.util.BooleanUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.stereotype.Component;
-
-
- import java.time.LocalDateTime;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import java.util.function.Function;
-
- import static com.hmdp.utils.RedisConstants.LOCK_SHOP_KEY;
-
- @Slf4j
- @Component
-
- /**
- * 缓存工具类1
- */
- public class CacheClient {
- private final StringRedisTemplate stringRedisTemplate;
-
- public CacheClient(StringRedisTemplate stringRedisTemplate){
- this.stringRedisTemplate = stringRedisTemplate;
- }
-
- public void set(String key, Object value, Long time, TimeUnit unit){
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
- }
-
- public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
- //设置逻辑过期
- RedisData redisData = new RedisData();
- redisData.setData(value);
- redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
- //写入Redis
- stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
- }
-
- public
R queryWithPassThrough( - String keyPrefix, ID id, Class
type, Function dbFallback, Long time, TimeUnit unit) { - String key = keyPrefix + id;
- //1.从redis查询商铺缓存
- String json = stringRedisTemplate.opsForValue().get(key);
- //2.判断是否存在
- if (StrUtil.isNotBlank(json)){
- //3.存在,直接返回
- return JSONUtil.toBean(json, type);
- }
- //判断命中的是否是空值
- if (json != null){
- //返回一个错误信息
- return null;
- }
- //4.不存在,根据id查询数据库
- R r = dbFallback.apply(id);
- //5.不存在,返回错误
- if (r == null){
- //将空值写入redis
- stringRedisTemplate.opsForValue().set(key,"",2,TimeUnit.MINUTES);
- //返回错误信息
- return null;
- }
- //6.存在写入redis
- this.set(key,r,time,unit);
- return r;
- }
-
- /**
- * 逻辑删除解决缓存击穿
- */
-
- private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
-
- public
R queryWithLogicalExpire( - String keyPrefix, ID id, Class
type, Function dbFallback, Long time, TimeUnit unit) { - String key = keyPrefix + id;
- //1.从redis查商铺缓存
- String json = stringRedisTemplate.opsForValue().get(key);
- //2.判断是否存在
- if (StrUtil.isBlank(json)) {
- //3.存在,缓存中存的null
- return null;
- }
- //4.命中,先把json反序列化为对象
- RedisData redisData = JSONUtil.toBean(json, RedisData.class);
- R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
- LocalDateTime expireTime = redisData.getExpireTime();
- //5.判断是否过期
- if (expireTime.isAfter(LocalDateTime.now())) {
- //5.1未过期,直接返回店铺信息
- return r;
- }
- //5.2已过期,需要缓存重建
- //6.缓存重建
- //6.1获取互斥锁
- String lockKey = LOCK_SHOP_KEY + id;
- boolean isLock = tryLock(lockKey);
- //6.2判断是否获取锁成功
- if (isLock) {
- //6.3 成功,再进行二次判断,查看缓存中是否有数据,因为有可能是别人刚刚重建完释放锁,刚好获取到了
-
- //6.4 开启独立线程,实现缓存重建
- CACHE_REBUILD_EXECUTOR.submit(() -> {
- try {
- //查询数据库
- R r1 = dbFallback.apply(id);
- //写入redis
- this.setWithLogicalExpire(key, r1, time, unit);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- //释放锁
- unlock(lockKey);
- }
- });
- }
- //6.6返回过期的商铺信息
- return r;
- }
-
-
- //获取锁和开锁
- private boolean tryLock(String key){
- Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
- return BooleanUtil.isTrue(flag);
- }
- private void unlock(String key){
- stringRedisTemplate.delete(key);
- }
- }
-
-
全局id生成器
全局ID生成器,是一种在分布式系统下用来生成全局唯一id的工具,一般满足:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
- @Component
- public class RedisWorker {
-
- private static final long BEGIN_TIMESTAMP=1640995000L;
- private static final long COUNT_BITS=32;
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- public long nextId(String keyPrefix){
- //1、生成时间戳
- LocalDateTime now=LocalDateTime.now();
- long nowSecond= now.toEpochSecond(ZoneOffset.UTC);
- long timestamp = nowSecond - BEGIN_TIMESTAMP;
-
- //2、生成序列号
- //2、1获取当前时间精确到天
- String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
- //2、2自增长
- Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
-
- //拼接并返回(先向左移把右边空出,然后在或,相当于加上)
- return timestamp<
- }
-
- }
测试类中
- private ExecutorService es= Executors.newFixedThreadPool(500);
- @Test
- void testIdWordker() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(300);
-
- Runnable task=()->{
- for (int i=0;i<100;i++){
- long id = redisWorker.nextId("order");
- System.out.println("id="+id);
- }
- latch.countDown();
- };
-
- long begin = System.currentTimeMillis();
- for (int i = 0; i <100 ; i++) {
- es.submit(task);
- }
- latch.await();
- long end = System.currentTimeMillis();
- System.out.println("time="+(end-begin));
- }
全局唯一id生成策略
- UUID
- redis自增
- snowflake算法
- 数据库自增
redis自增id策略
- 每天一个key,方便统计订单量
- id构造是 时间戳+计数器
优惠券添加
没有后台只能通过postman添加
- {
- "shopId": 1,
- "title": "200元代金券",
- "subTitle": "周六周末可用",
- "rules": "全场通用\\n可以叠加\\n仅限制堂食",
- "payValue": 16000,
- "actualValue": 20000,
- "type": 1,
- "stock": 100,
- "beginTime": "2023-08-01T00:00:00",
- "endTime": "2024-08-01T00:00:00"
- }


优惠券秒杀下单

下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单

- @Service
- public class VoucherOrderServiceImpl extends ServiceImpl
implements IVoucherOrderService { - @Resource
- private ISeckillVoucherService seckillVoucherService;
- @Resource
- private RedisWorker redisWorker;
- /**
- * 优惠券下单
- * @param voucherId
- * @return
- */
- @Override
- @Transactional
- public Result seckillVoucher(Long voucherId) {
- //1、查询优惠券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
- //2、判断秒杀是否开始
- if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
- //尚未开始
- return Result.fail("秒杀还未开始");
- }
- //3、判断秒杀是否结束
- if (voucher.getEndTime().isBefore(LocalDateTime.now())){
- //已经结束
- return Result.fail("秒杀已经结束");
- }
- //4、判断库存是否充足
- if (voucher.getStock()<1){
- //库存不足
- return Result.fail("已经被抢完");
- }
- //5、扣减库存
- boolean success = seckillVoucherService.update()
- .setSql("stock=stock-1")
- .eq("voucher_id", voucherId).update();
- if (!success)return Result.fail("库存不足");
- //6创建秒杀券订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //6.1订单id
- long orderId = redisWorker.nextId("order");
- voucherOrder.setId(orderId);
- //6.2用户id
- Long userId = UserHolder.getUser().getId();
- voucherOrder.setUserId(userId);
- //6.3代金券id
- voucherOrder.setVoucherId(voucherId);
- //添加订单
- save(voucherOrder);
- //7返回订单id
- return Result.ok(orderId);
- }
- }

JMeter线程测试遇到401错误
这是未授权问题
用 F12 打开开发者工具
在network网络 里寻找相关信息:

添加一个信息管理器

订单出现了超卖

超卖问题分析
超卖问题是多线程安全问题,即在一个线程还没执行完,其他线程抢先执行,对同一个数据进行修改

乐观锁
乐观锁的关键是判断之前查询到的数据是否被修改,常见的方式有
版本号法
在修改之前查询一次版本号,若版本号不变则说明没有被其他线程修改,则正常进行数据修改,并让版本加一,若是版本号不一致,则不会执行


CAS法
直接比较数据是否发生了改变,若不变则说明安全

如果弄数据是否和之前一致,会导致成功率低
设置200个线程只卖出23个

- @Service
- public class VoucherOrderServiceImpl extends ServiceImpl
implements IVoucherOrderService { - @Resource
- private ISeckillVoucherService seckillVoucherService;
- @Resource
- private RedisWorker redisWorker;
- /**
- * 优惠券下单
- * @param voucherId
- * @return
- */
- @Override
- @Transactional
- public Result seckillVoucher(Long voucherId) {
- //1、查询优惠券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
- //2、判断秒杀是否开始
- if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
- //尚未开始
- return Result.fail("秒杀还未开始!");
- }
- //3、判断秒杀是否结束
- if (voucher.getEndTime().isBefore(LocalDateTime.now())){
- //已经结束
- return Result.fail("秒杀已经结束");
- }
- //4、判断库存是否充足
- if (voucher.getStock()<1){
- //库存不足
- return Result.fail("已经被抢完");
- }
- //5、扣减库存
- boolean success = seckillVoucherService.update()
- .setSql("stock=stock-1") //set stock=stock-1
- .eq("voucher_id", voucherId).gt("stock", 0)//where id=? and stock=?只要大于0即可
- .update();
- if (!success)return Result.fail("库存不足");
- //6创建秒杀券订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //6.1订单id
- long orderId = redisWorker.nextId("order");
- voucherOrder.setId(orderId);
- //6.2用户id
- Long userId = UserHolder.getUser().getId();
- voucherOrder.setUserId(userId);
- //6.3代金券id
- voucherOrder.setVoucherId(voucherId);
- //添加订单
- save(voucherOrder);
- //7返回订单id
- return Result.ok(orderId);
- }
- }
超卖这样的线程安全问题,解决方案有哪些?
1.悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般
乐观锁:不加锁,在更新时判断是否有其它线程在修改
- 优点:性能好
- 缺点:存在成功率低的问题
给整个this对象上锁
优点是简单安全
缺点是性能低,因为这样所有用户都被锁上了,我们的初衷是,单个用户中,只能单卖,这样就会导致其他用户也会受到影响
- @Service
- public class VoucherOrderServiceImpl extends ServiceImpl
implements IVoucherOrderService { - @Resource
- private ISeckillVoucherService seckillVoucherService;
- @Resource
- private RedisWorker redisWorker;
- /**
- * 优惠券下单
- * @param voucherId
- * @return
- */
- @Override
- public Result seckillVoucher(Long voucherId) {
- //1、查询优惠券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
- //2、判断秒杀是否开始
- if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
- //尚未开始
- return Result.fail("秒杀还未开始!");
- }
- //3、判断秒杀是否结束
- if (voucher.getEndTime().isBefore(LocalDateTime.now())){
- //已经结束
- return Result.fail("秒杀已经结束");
- }
- //4、判断库存是否充足
- if (voucher.getStock()<1){
- //库存不足
- return Result.fail("已经被抢完");
- }
-
-
- //7返回订单id
- return createVoucherOrder(voucherId);
- }
-
- @Transactional
- public synchronized Result createVoucherOrder(Long voucherId){
- //5、一人一单
- Long userId = UserHolder.getUser().getId();
- //5.1查询订单
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- //判断是否已经存在
- if (count>0){
- //用户已经购买过
- return Result.fail("你已经购买过这个券了");
- }
-
-
- //6、扣减库存
- boolean success = seckillVoucherService.update()
- .setSql("stock=stock-1") //set stock=stock-1
- .eq("voucher_id", voucherId).gt("stock", 0)//where id=? and stock=?只要大于0即可
- .update();
- if (!success)return Result.fail("库存不足");
- //6创建秒杀券订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //6.1订单id
- long orderId = redisWorker.nextId("order");
- voucherOrder.setId(orderId);
- //6.2用户id
-
- voucherOrder.setUserId(userId);
- //6.3代金券id
- voucherOrder.setVoucherId(voucherId);
- //添加订单
- save(voucherOrder);
- //返回订单id
- return Result.ok(orderId);
- }
- }
悲观锁升级后
- @Override
- public Result seckillVoucher(Long voucherId) {
- //1、查询优惠券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
- //2、判断秒杀是否开始
- if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
- //尚未开始
- return Result.fail("秒杀还未开始!");
- }
- //3、判断秒杀是否结束
- if (voucher.getEndTime().isBefore(LocalDateTime.now())){
- //已经结束
- return Result.fail("秒杀已经结束");
- }
- //4、判断库存是否充足
- if (voucher.getStock()<1){
- //库存不足
- return Result.fail("已经被抢完");
- }
- Long userId = UserHolder.getUser().getId();
-
- synchronized (userId.toString().intern()){//intern获取源对象,new多少次都只是从常量池中寻找
- //拿到当前对象的代理对象(事务的对象)
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createVoucherOrder(voucherId);//要想让事务生效,必须要有代理对象
- }
- }
- @Transactional
- public Result createVoucherOrder(Long voucherId){
- //5、一人一单
- Long userId = UserHolder.getUser().getId();
- //5.1查询订单
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- //判断是否已经存在
- if (count>0){
- //用户已经购买过
- return Result.fail("你已经购买过这个券了");
- }
-
-
- //6、扣减库存
- boolean success = seckillVoucherService.update()
- .setSql("stock=stock-1") //set stock=stock-1
- .eq("voucher_id", voucherId).gt("stock", 0)//where id=? and stock=?只要大于0即可
- .update();
- if (!success)return Result.fail("库存不足");
- //6创建秒杀券订单
- VoucherOrder voucherOrder = new VoucherOrder();
- //6.1订单id
- long orderId = redisWorker.nextId("order");
- voucherOrder.setId(orderId);
- //6.2用户id
-
- voucherOrder.setUserId(userId);
- //6.3代金券id
- voucherOrder.setVoucherId(voucherId);
- //添加订单
- save(voucherOrder);
- //返回订单id
- return Result.ok(orderId);
- }
一人一单的并发安全问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
1.我们将服务启动两份,端口分别为8081和8082:
2.然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡
nginx配置实现了反代理和负载均衡


发现线程不安全

集群下的锁监听器tomcat等不是同一个
分布式锁
什么是分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

分布式锁的实现
分布式锁的核心是是实现多进程之间互斥,常见的有三种

实现分布式锁时需要实现两个基本方法
获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
添加锁,nx是互斥(当存在则不执行),ex是设置超时时间
local是键 thread1是值

释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间
释放锁,直接删除即可

业务流程

基于Redis实现分布式锁的初级版本
锁的类
- public class SimpleRedisLock implements ILock{
-
- private StringRedisTemplate stringRedisTemplate;
- private String name;
-
- public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
- this.stringRedisTemplate = stringRedisTemplate;
- this.name = name;
- }
-
- private static final String KEY_PREFIX="lock:";
-
- //获取锁
- @Override
- public boolean tryLock(long timeoutSec) {
- //获取线程标识
- long threadId = Thread.currentThread().getId();
- //获取锁,这里的返回值是一个布尔型的包装类,直接返回有时会出现空指针异常
- Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
- //和真比较,就不会出现异常
- return Boolean.TRUE.equals(success) ;
- }
-
- //释放锁
- @Override
- public void unlock() {
- stringRedisTemplate.delete(KEY_PREFIX+name);
- }
- }
- Long userId = UserHolder.getUser().getId();
-
- //创建锁对象
- SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
- //获取锁
- boolean isLock = lock.tryLock(12);
-
- //判断是否获取锁成功
- if (!isLock){
- //获取锁失败
- return Result.fail("一个人只能下一单");
- }
- try {
- //拿到当前对象的代理对象(事务的对象)
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createVoucherOrder(voucherId);//要想让事务生效,必须要有代理对象
- } finally {
- //释放锁
- lock.unlock();
- }
线程存在问题
线程阻塞超时自动删除后,线程完成释放别的线程的锁
存在的线程阻塞超时自动删除后,线程释放别的线程的锁

改进分布式锁(判断线程和存的是否一致)
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标示(可以用UUID表示)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致如果一致则释放锁
如果不一致则不释放锁

- public void unlock() {
- //获取线程标识
- String threadId= ID_PREFIX+Thread.currentThread().getId();
- //获取锁中的标识
- String id=stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
- //判断两个是否一致,从而判断是否为同一线程
- if (threadId.equals(id)){
- stringRedisTemplate.delete(KEY_PREFIX+name);
- }
有并发安全分析
这里有个并发问题,即当判断完相同时,发生了阻塞,没来得及删除锁,被redis超时释放后,下一个线程来获取后,之前那个线程阻塞完成,就会释放掉锁,但是此时这把锁的拥有者不是他。
所以我们改进的是时候,应该保证,判断和删除在同一条语句中,即使用lua脚本可以保证原子性
Redis的Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站: https://www.runoob.com/lua/lua-tutorial.html语法如下:

如,我们要执行set name jack则脚本是这样

列如。我们要先执行set name Rose,在执行get name ,则脚本如下

需要用Redis命令来调用脚本,调用脚本的常见命令如下:

例如,我们要执行 redis.call('set','name','jack') 这个脚本,语法如下:

脚本中可以从KEYS和ARGV数组获取这些参数:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在
resoure下创建unlock.lua
- --比较线程标识与锁中是否一致
- if (redis.call('get',KEYS[1])==ARGV[1]) then
- -- 释放锁 del key
- return redis.call('del',KEYS[1])
- end
- return 0
- private static final String KEY_PREFIX = "lock:";
- private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
- private static final DefaultRedisScript
UNLOCK_SCRIPT; -
- static {
- UNLOCK_SCRIPT = new DefaultRedisScript<>();
- UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
- UNLOCK_SCRIPT.setResultType(Long.class);
- }
-
- //获取锁
- @Override
- public boolean tryLock(long timeoutSec) {
- //获取线程标识
- String threadId = ID_PREFIX + Thread.currentThread().getId();
- //获取锁,这里的返回值是一个布尔型的包装类,直接返回有时会出现空指针异常
- Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
- //和真比较,就不会出现异常
- return Boolean.TRUE.equals(success);
- }
-
- //释放锁
- @Override
- public void unlock() {
- //调用Lia
- stringRedisTemplate.execute(UNLOCK_SCRIPT,
- Collections.singletonList(KEY_PREFIX+name),
- ID_PREFIX+Thread.currentThread().getId());
- }
基于Redis的分布式锁实现思路
- 利用set nxex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用setnx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
还存在的问题
基于setnx实现的分布式锁还存在下面的问题
不可重入:同一个线程无法多次获取同一把锁
不可重试:获取锁只尝试一次就返回false,没有重试机制
超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
主从一致性:如果Redis提供了主从集群主从同步存在延迟,当主宕机时,如果从并同步主中的
锁数据,则会出现锁实现
Redission实现分布式锁
Redisson是一个在Redis的基础上实现的ava驻内存数据网格 (In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisso
Redisson入门
导入maven地址
-
-
-
org.redisson -
redisson -
3.13.6 -
创建配置文件
- package com.hmdp.config;
-
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RedissonConfig {
-
- @Bean
- public RedissonClient redissonClient(){
- //配置
- Config config=new Config();
- config.useSingleServer().setAddress("redis://localhost:6379");
- //创建RedissonClient对象
- return Redisson.create(config);
- }
- }
- //创建锁对象
- // SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
- RLock lock = redissonClient.getLock("lock:order:" + userId);
-
- //获取锁,数量分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
- boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
- if(isLock){
- try {
- System.out.println("执行业务");}
- finally {
- // 释放锁
- lock.unlock();}
Redisson可重入锁原理

获取锁的Lua脚本

释放锁的Lua脚本

Redisson分布式锁的原理

Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:用于watchDog.每隔一段时间(releaseTime/3),重置超时时间
总结
1)不可重入Redis分布式锁
原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
缺陷:不可重入、无法重试、锁超时失效
2)可重入的Redis分布式锁:
原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷: redis宕机引起锁失效问题
3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
Redis秒杀优化(暂未实现)



达人探店
点赞功能

- /**
- * 博客点赞
- *
- * @param id
- * @return
- */
- @Override
- public Result likeBlog(Long id) {
- //获取blog实体
- Blog blog = getById(id);
- //获取博客id
- Long blogId = blog.getId();
- //获取登录当前用户id
- Long userId = UserHolder.getUser().getId();
-
- //拼接key
- String key = BLOG_LIKED_KEY + blogId;
- //去redis中看有没有点赞过,查询这个key是否存在
- Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
- //不存在,即没点赞过
- if (BooleanUtil.isFalse(isMember)) {
- //修改数据库,让liked加1
- boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();
- if (isSuccess) {
- //修改成功后,将自定义的blog的isLike(是否点赞过)改为true
- // blog.setIsLike(true);不能,因为这不是数据库中的字段存不了
- //将这个点赞信息加到redis缓存中
- stringRedisTemplate.opsForSet().add(key, userId.toString());
- }
- } else {
- //存在,即点赞过,则取消赞
- boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();
- if (isSuccess) {
- //修改,删除
- stringRedisTemplate.opsForSet().remove(key, userId.toString());
- }
- }
- return Result.ok();
- }
实现点赞排行榜功能
- /**
- * 博客点赞排序
- * @param id
- * @return
- */
- @Override
- public Result queryBlogLikes(long id) {
- String key =BLOG_LIKED_KEY+id;
- //1查询top5的点赞用户 zrange key 0 4
- Set
topRange = stringRedisTemplate.opsForZSet().range(key, 0, 4); - if (topRange==null||topRange.isEmpty()){
- return Result.ok(Collections.emptyList());
- }
- //2解析出其中的用户id
- List
ids = topRange.stream().map(Long::valueOf).collect(Collectors.toList()); - String idStr = StrUtil.join(",", ids);
- //3根据用户id查询用户where id in (5,1) order by field (id,5,1)
- List
userDTOS = userService.query().in("id", ids).last("order by field (id," + idStr + ")").list().stream() - .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
- .collect(Collectors.toList());
- return Result.ok(userDTOS);
- }
关注和取关

- @Override
- public Result follow(Long followUserId, boolean isFollow) {
- //1获取当前登录用户
- Long userId = UserHolder.getUser().getId();
- if (userId==null)return Result.fail("还没有登录");
- Follow follow = new Follow();
- //2判断为关注还是取关
- if (isFollow){
- //为关注,新增follow数据
- follow.setUserId(userId);
- follow.setFollowUserId(followUserId);
- save(follow);
- }else {
- //为取关,删除follow数据
- LambdaUpdateWrapper
updateWrapper=new LambdaUpdateWrapper(); - updateWrapper.eq(Follow::getUserId, userId)
- .eq(Follow::getFollowUserId,followUserId);
- //删除数据
- this.remove(updateWrapper);
-
- }
- return Result.ok();
- }
-
- @Override
- public Result isFollow(Long followUserId) {
- //1获取当前登录用户
- Long userId = UserHolder.getUser().getId();
- if (userId==null)return Result.fail("还没有登录");
- Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
- //判断是否有数据
- return Result.ok(count>0);
- }
查看共同关注
- /**
- * 被查看的人和我的共同关注
- *
- * @param checkedUserId
- * @return
- */
- @Override
- public Result commonConcernPerson(Long checkedUserId) {
- //1获取当前登录用户
- Long userId = UserHolder.getUser().getId();
- if (userId == null) return Result.fail("还没有登录");
- String key = "follows:" + userId;
- //2求交集
- String key2 = "follows:" + checkedUserId;
- Set
intersect = stringRedisTemplate.opsForSet().intersect(key, key2); - if (intersect==null||intersect.isEmpty()){
- //没有交集
- return Result.ok(Collections.emptyList());
- }
- //解析出id集合
- List
ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList()); - //4、根据id查询用户,转为userDto
- List
userDTOS = userService.listByIds(ids) - .stream()
- .map(user -> BeanUtil.copyProperties(user, UserDTO.class ))
- .collect(Collectors.toList());
- return Result.ok(userDTOS);
- }
关注推送
关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
feed流模式
Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户> 优点: 投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用


对于普通人:直接发送到他的粉丝下
对于大v:活跃粉丝直接给他推送,不活跃粉丝放在收件箱,等他要读的时候推
feed流实现方案
需求
修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
查询收件箱数据时,可以实现分页查询
不能使用传统的分页,得使用滚动分页


-
- @GetMapping("/of/follow")
- public Result queryBlogOfFollow(
- @RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset){
- return blogService.queryBlogOfFollow(max,offset);
- }
- @Override
- public Result queryBlogOfFollow(Long max, Integer offset) {
- //1获取当前用户
- Long userId = UserHolder.getUser().getId();
- //2查询收件箱 ZREVRANGEBYSCORE key Max Min LIMiT offset count
- String key =FEED_KEY+userId;
- Set
> typedTuples = stringRedisTemplate.opsForZSet() - .reverseRangeByScoreWithScores(key, 0, max, offset, 2);
- //3判断是否为空
- if (typedTuples==null||typedTuples.isEmpty()){
- return Result.ok();
- }
- //4解析数据:blogId,minTime(时间戳),offset
- List
ids=new ArrayList<>(typedTuples.size()); - long minTime=0;
- int offNum=1;
- //统计有多次offNum(和最小的相同的个数)
- for (ZSetOperations.TypedTuple
tuple : typedTuples) { - //获取id
- ids.add(Long.valueOf(tuple.getValue()));
- //获取分数(时间戳)
- long time = tuple.getScore().longValue();
- if (time==minTime){
- offNum++;
- }else {
- minTime=time;
- offNum=1;
- }
- }
- //5,根据id查询blog
- String idStr =StrUtil.join(",", ids);
- List
blogs = query().in("id", ids).last("order by field (id," + idStr + ")").list(); -
- //把博客相关信息点赞填充
- for (Blog blog : blogs) {
- //查询blog有关用户
- queryBlogUser(blog);
- isLikeBlog(blog);
- }
-
- //5封装并返回
- ScrollResult scrollResult = new ScrollResult();
- scrollResult.setList(blogs);
- scrollResult.setOffset(offNum);
- scrollResult.setMinTime(minTime);
- return Result.ok(scrollResult);
- }
Redis最佳实践
Redis键值设计
优雅的key结构
Redis的key虽然可以自定义,但最好遵循下面结构最佳实践的约定:
- 遵循基本格式:[业务名称]:[数据名]:[id]
- 长度不超过44字节
- 不包含特殊字符
例如:在登录业务,保存用户信息,key是这样:login:user:1
优点:
- 可读强
- 避免key冲突
- 方便管理
更节省内存:key是string类型,底层编码包含int、embstr、和raw三种,embstr在小于44字节使用,采用连续的空间,内存占用更小