• 分布式多级缓存


    项目结构图

     运行反向代理服务器也就是负责反向代理到三个nginx的nginx,该nignx也负责前端页面的跳转。

    nginx的conf为下:

    突出位置就是该nginx需要反向代理的其他nginx的IP和端口。

    在资源比较有限的时候我们通常不适用上述的机构,而是用使用Caffeine进行二级缓存,在Cffeine没有查找到数据,我们才会去redis中查询数据。

    Caffeine是什么?

    Caffeine和redis都是内存级别的缓存,为什么要使用在这两缓存作为二级缓存,它们两有什么区别呢?

    虽然它们都是内存级别的缓存,redis是需要单独部署的,其需要一个单独的进程,在tomcat访问redis时需要网络通信的开销,而Caffeine跟我们项目代码是写在一起的,它是JVM级别的缓存,用的就是Java中的堆内存,无需网络的通信的开销,在Caffeine找不到数据后才会去redis中查找。

    Caffeine的使用

    导入依赖

    1. <!--jvm进程缓存-->
    2. <dependency>
    3. <groupId>com.github.ben-manes.caffeine</groupId>
    4. <artifactId>caffeine</artifactId>
    5. </dependency>

    进行测试

    1. import com.github.benmanes.caffeine.cache.Cache;
    2. import com.github.benmanes.caffeine.cache.Caffeine;
    3. import org.junit.jupiter.api.Test;
    4. import org.springframework.boot.test.context.SpringBootTest;
    5. @SpringBootTest
    6. public class test {
    7. @Test
    8. public void test1() {
    9. Cache<Object, Object> cache = Caffeine.newBuilder()
    10. .initialCapacity(100) //设置缓存的初始化容量
    11. .maximumSize(1000) //设置最大的容量
    12. .build();
    13. //向缓存中插入数据
    14. cache.put("key1", 123);
    15. //从缓存中取出数据
    16. Object value1 = cache.get("key1", key -> 456);
    17. System.out.println(value1);
    18. //获取没有的数据
    19. Object value2 = cache.get("key2", key -> 789);
    20. System.out.println(value2);
    21. }
    22. }

    驱逐策略(面试点: 使用Caffeine为了防止内存溢出,怎么做?)

    为了防止一直往内存里装数值导致占用内存,所以Caffeine给我们提供了驱逐策略。

    1.基于容量(设置缓存的上限)

    1. @Test
    2. public void test2() {
    3. Cache cache = Caffeine.newBuilder()
    4. .initialCapacity(100) //设置缓存的初始化容量
    5. .maximumSize(1000) //设置最大的容量
    6. .build();
    7. }

    通过设置最大的容量来控制内存,当内存达到最大时,会将最早存入的数据删除,当缓存超出这个容量的时候,会使用Window TinyLfu策略来删除缓存。

    2.基于时间(设置有效期)

    1. @Test
    2. public void test3() {
    3. Cache cache = Caffeine.newBuilder()
    4. .initialCapacity(100)
    5. .expireAfterWrite(Duration.ofSeconds(10)) //设置缓存的有效期,此时就是设置为10s
    6. .build();
    7. }

    3.基于引用:设置数据的强引用和弱引用,在内存不足的时候jvm会进行垃圾回收,会将弱引用的数据进行回收,性能差,不建议使用。

    设置一级缓存 

    Caffeine配置(配置到ioc中,后续提供依赖注入进行使用)

    1. import com.github.benmanes.caffeine.cache.Caffeine;
    2. import com.sl.transport.info.domain.TransportInfoDTO;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. /**
    7. * Caffeine缓存配置
    8. */
    9. @Configuration
    10. public class CaffeineConfig {
    11. //初始化的容量大小
    12. @Value("${caffeine.init}")
    13. private Integer init;
    14. //最大的容量大小
    15. @Value("${caffeine.max}")
    16. private Integer max;
    17. @Bean
    18. public Cache transportInfoCache() {
    19. return Caffeine.newBuilder()
    20. .initialCapacity(init)
    21. .maximumSize(max).build();
    22. }
    23. }

    在Controller层中设置一级缓存

    1. @Resource
    2. private TransportInfoService transportInfoService;
    3. @Resource
    4. private Cache transportInfoCache;
    5. /**
    6. * 根据运单id查询运单信息
    7. *
    8. * @param transportOrderId 运单号
    9. * @return 运单信息
    10. */
    11. @ApiImplicitParams({
    12. @ApiImplicitParam(name = "transportOrderId", value = "运单id")
    13. })
    14. @ApiOperation(value = "查询", notes = "根据运单id查询物流信息")
    15. @GetMapping("{transportOrderId}")
    16. public TransportInfoDTO queryByTransportOrderId(@PathVariable("transportOrderId") String transportOrderId) {
    17. //提供Caffeine先获取一级缓存,如果没有缓存就去Mongodb中查数据
    18. TransportInfoDTO transportInfoDTO = transportInfoCache.get(transportOrderId, id -> {
    19. TransportInfoEntity transportInfoEntity = transportInfoService.queryByTransportOrderId(transportOrderId);
    20. return BeanUtil.toBean(transportInfoEntity, TransportInfoDTO.class);
    21. });
    22. if(ObjectUtil.isNotEmpty(transportInfoDTO)) {
    23. return transportInfoDTO;
    24. }
    25. throw new SLException(ExceptionEnum.NOT_FOUND);
    26. }

    设置二级缓存(使用springCache进行二级缓存)

    配置springCache的配置(redis的配置)

    1. import org.springframework.beans.factory.annotation.Value;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.data.redis.cache.RedisCacheConfiguration;
    5. import org.springframework.data.redis.cache.RedisCacheManager;
    6. import org.springframework.data.redis.core.RedisTemplate;
    7. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
    8. import org.springframework.data.redis.serializer.RedisSerializationContext;
    9. import org.springframework.data.redis.serializer.StringRedisSerializer;
    10. import java.time.Duration;
    11. /**
    12. * Redis相关的配置
    13. */
    14. @Configuration
    15. public class RedisConfig {
    16. /**
    17. * 存储的默认有效期时间,单位:小时
    18. */
    19. @Value("${redis.ttl:1}")
    20. private Integer redisTtl;
    21. @Bean
    22. public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
    23. // 默认配置
    24. RedisCacheConfiguration defaultCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
    25. // 设置key的序列化方式为字符串
    26. .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
    27. // 设置value的序列化方式为json格式
    28. .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()))
    29. .disableCachingNullValues() // 不缓存null
    30. .entryTtl(Duration.ofHours(redisTtl)); // 默认缓存数据保存1小时
    31. // 构redis缓存管理器
    32. RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder
    33. .fromConnectionFactory(redisTemplate.getConnectionFactory())
    34. .cacheDefaults(defaultCacheConfiguration)
    35. .transactionAware() // 只在事务成功提交后才会进行缓存的put/evict操作
    36. .build();
    37. return redisCacheManager;
    38. }
    39. }

     在查询查找的Service上添加对应的注解

    1. @Override
    2. //该注解的在作用就是查询到的数据缓存到redis中其key值就为: transport-info::transportOrderId
    3. //注解其中key的值表示key拼接的参数,这里就是第一个参数
    4. @Cacheable(value = "transport-info", key = "#p0")
    5. public TransportInfoEntity queryByTransportOrderId(String transportOrderId) {
    6. //通过orderId创建查询条件,查询物流信息
    7. return mongoTemplate.findOne(
    8. Query.query(Criteria.where("transportOrderId").is(transportOrderId)),
    9. TransportInfoEntity.class
    10. );
    11. }

    添加此注解后,会先在redis的缓存中查找数据,如果有数据就直接返回数据,如果没有才会提供Mongodb查询。

    当然为了保证在数据修改后还能保证缓存的准确性,这里我们需要在修改操作上添加springCache的注解@CachePut。(该注解的作用就是更新缓存的数据,所以可以在缓存的增删改时添加该注解

    1. @Override
    2. @CachePut(value = "transport-info", key = "#p0")
    3. public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
    4. //通过orderId创建查询条件,查询物流信息是否存在
    5. TransportInfoEntity updateTransportInfoEntity = mongoTemplate.findOne(
    6. Query.query(Criteria.where("transportOrderId").is(transportOrderId)),
    7. TransportInfoEntity.class
    8. );
    9. if(ObjectUtil.isNotEmpty(updateTransportInfoEntity)) {
    10. //如果存在就获取对应的信息,在infoList中添加对应的物流信息
    11. updateTransportInfoEntity.getInfoList().add(infoDetail);
    12. } else {
    13. //如果不存在就新建一个document
    14. updateTransportInfoEntity = new TransportInfoEntity();
    15. updateTransportInfoEntity.setTransportOrderId(transportOrderId);
    16. updateTransportInfoEntity.setInfoList(ListUtil.toList(infoDetail));
    17. updateTransportInfoEntity.setCreated(System.currentTimeMillis());
    18. }
    19. //修改物流信息的修改时间
    20. updateTransportInfoEntity.setUpdated(System.currentTimeMillis());
    21. //进行新增或修改操作 id为空时就进行新增,不为空时进行修改操作
    22. return mongoTemplate.save(updateTransportInfoEntity);
    23. }

    一级缓存更新的问题

    修改后,在一级缓存中的数据是不变的,所以为了保证数据的准确性,我们先是想到在进行增删改的时候用this.transportInfoCache.invalidate(transportOrderId);来清除缓存但是在微服务的情况小会出现数据不一致的情况。(因为一级缓存在微服务间不是共享的)

    1. @Override
    2. //value和key就是对缓存中key的拼接,这里的key就是transport-info::对应的第一个参数
    3. @CachePut(value = "transport-info", key = "#p0")
    4. public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
    5. //通过orderId创建查询条件,查询物流信息是否存在
    6. TransportInfoEntity updateTransportInfoEntity = mongoTemplate.findOne(
    7. Query.query(Criteria.where("transportOrderId").is(transportOrderId)),
    8. TransportInfoEntity.class
    9. );
    10. if(ObjectUtil.isNotEmpty(updateTransportInfoEntity)) {
    11. //如果存在就获取对应的信息,在infoList中添加对应的物流信息
    12. updateTransportInfoEntity.getInfoList().add(infoDetail);
    13. } else {
    14. //如果不存在就新建一个document
    15. updateTransportInfoEntity = new TransportInfoEntity();
    16. updateTransportInfoEntity.setTransportOrderId(transportOrderId);
    17. updateTransportInfoEntity.setInfoList(ListUtil.toList(infoDetail));
    18. updateTransportInfoEntity.setCreated(System.currentTimeMillis());
    19. }
    20. //修改物流信息的修改时间
    21. updateTransportInfoEntity.setUpdated(System.currentTimeMillis());
    22. //清除缓存中的数据
    23. this.transportInfoCache.invalidate(transportOrderId);
    24. //进行新增或修改操作 id为空时就进行新增,不为空时进行修改操作
    25. return mongoTemplate.save(updateTransportInfoEntity);
    26. }

    为了解决此问题,我们引入了redis中的发布与订阅的功能来解决此问题。

    类似mq的机制,在发送对应的key也就是消息,然后订阅该消息的模块就会执行自定义的操作。

    在配置中增加订阅的配置

    1. import org.springframework.beans.factory.annotation.Value;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import org.springframework.data.redis.cache.RedisCacheConfiguration;
    5. import org.springframework.data.redis.cache.RedisCacheManager;
    6. import org.springframework.data.redis.connection.RedisConnectionFactory;
    7. import org.springframework.data.redis.core.RedisTemplate;
    8. import org.springframework.data.redis.listener.ChannelTopic;
    9. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    10. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    11. import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
    12. import org.springframework.data.redis.serializer.RedisSerializationContext;
    13. import org.springframework.data.redis.serializer.StringRedisSerializer;
    14. import java.time.Duration;
    15. /**
    16. * Redis相关的配置
    17. */
    18. @Configuration
    19. public class RedisConfig {
    20. /**
    21. * 存储的默认有效期时间,单位:小时
    22. */
    23. @Value("${redis.ttl:1}")
    24. private Integer redisTtl;
    25. @Bean
    26. public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
    27. // 默认配置
    28. RedisCacheConfiguration defaultCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
    29. // 设置key的序列化方式为字符串
    30. .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
    31. // 设置value的序列化方式为json格式
    32. .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()))
    33. .disableCachingNullValues() // 不缓存null
    34. .entryTtl(Duration.ofHours(redisTtl)); // 默认缓存数据保存1小时
    35. // 构redis缓存管理器
    36. RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder
    37. .fromConnectionFactory(redisTemplate.getConnectionFactory())
    38. .cacheDefaults(defaultCacheConfiguration)
    39. .transactionAware() // 只在事务成功提交后才会进行缓存的put/evict操作
    40. .build();
    41. return redisCacheManager;
    42. }
    43. public static final String CHANNEL_TOPIC = "sl-express-ms-transport-info-caffeine";
    44. /**
    45. * 配置订阅,用于解决Caffeine一致性的问题
    46. *
    47. * @param connectionFactory 链接工厂
    48. * @param listenerAdapter 消息监听器
    49. * @return 消息监听容器
    50. */
    51. @Bean
    52. public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
    53. MessageListenerAdapter listenerAdapter) {
    54. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    55. container.setConnectionFactory(connectionFactory);
    56. container.addMessageListener(listenerAdapter, new ChannelTopic(CHANNEL_TOPIC));
    57. return container;
    58. }
    59. }

     编写RedisMessageListener用于监听消息(监听消息后执行的自定义方法),删除caffeine中的数据。(可以理解成监听方法)

    1. import cn.hutool.core.convert.Convert;
    2. import com.github.benmanes.caffeine.cache.Cache;
    3. import com.sl.transport.info.domain.TransportInfoDTO;
    4. import org.springframework.data.redis.connection.Message;
    5. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    6. import org.springframework.stereotype.Component;
    7. import javax.annotation.Resource;
    8. /**
    9. * redis消息监听,解决Caffeine一致性的问题
    10. */
    11. @Component
    12. public class RedisMessageListener extends MessageListenerAdapter {
    13. @Resource
    14. private Cache transportInfoCache;
    15. @Override
    16. public void onMessage(Message message, byte[] pattern) {
    17. //获取到消息中的运单id
    18. String transportOrderId = Convert.toStr(message);
    19. //将本jvm中的缓存删除掉
    20. this.transportInfoCache.invalidate(transportOrderId);
    21. }
    22. }

    在增删改的方法中向对应的频道发送消息。

    1. @Override
    2. //value和key就是对缓存中key的拼接,这里的key就是transport-info::对应的第一个参数
    3. @CachePut(value = "transport-info", key = "#p0")
    4. public TransportInfoEntity saveOrUpdate(String transportOrderId, TransportInfoDetail infoDetail) {
    5. //通过orderId创建查询条件,查询物流信息是否存在
    6. TransportInfoEntity updateTransportInfoEntity = mongoTemplate.findOne(
    7. Query.query(Criteria.where("transportOrderId").is(transportOrderId)),
    8. TransportInfoEntity.class
    9. );
    10. if(ObjectUtil.isNotEmpty(updateTransportInfoEntity)) {
    11. //如果存在就获取对应的信息,在infoList中添加对应的物流信息
    12. updateTransportInfoEntity.getInfoList().add(infoDetail);
    13. } else {
    14. //如果不存在就新建一个document
    15. updateTransportInfoEntity = new TransportInfoEntity();
    16. updateTransportInfoEntity.setTransportOrderId(transportOrderId);
    17. updateTransportInfoEntity.setInfoList(ListUtil.toList(infoDetail));
    18. updateTransportInfoEntity.setCreated(System.currentTimeMillis());
    19. }
    20. //修改物流信息的修改时间
    21. updateTransportInfoEntity.setUpdated(System.currentTimeMillis());
    22. //清除缓存中的数据
    23. this.stringRedisTemplate.convertAndSend(RedisConfig.CHANNEL_TOPIC, transportOrderId);
    24. //进行新增或修改操作 id为空时就进行新增,不为空时进行修改操作
    25. return mongoTemplate.save(updateTransportInfoEntity);
    26. }

    最终保证了一级缓存的准确性。

    问: 那redis的这种机制也可以完成mq的一系列操作,为什么微服务中没有大量使用呢?

    答:redis的发布订阅没有可靠性的处理,没有像mq那样的重试机制,所以我们微服务中没有大量使用。

  • 相关阅读:
    MOSN 反向通道详解
    【代码随想录】算法训练营 第十五天 第六章 二叉树 Part 2
    如何解决缓存一致性问题
    基于OCC+OSG的CAD之GMSH与Netgen网格连贯性测试
    Unsupervised Learning of Monocular Depth Estimation and Visual Odometry 论文阅读
    超越openmp通用核心的硬件
    Unity - 2D物理系统
    shell脚本编程基础(中)
    算法(第4版)练习题 1.1.27 的三种解法
    HTTP 错误 500.19 - Internal Server Error 无法访问请求的页面,因为该页的相关配置数据无效——错误代码:0x8007000d
  • 原文地址:https://blog.csdn.net/Ostkakah/article/details/132792579