• SpringBoot+Redis实现不重复消费的队列


         背景

            最近我们新研发了一个“年夜饭订购”功能(没想到吧,雷袭在是一个程序猿的同时,也是一名优秀的在厨子)。用户使用系统选择年夜饭,点击“下单”时,后台首先会生成一条订单数据,返回消息给用户:“您已成功下单,后厨正在准备菜品!”。同时,以线程的方式指挥各个厨子按菜单联系供应商准备食材,制作菜品,最后打包寄给客户。但是,用户在使用这个功能时,系统却有一定的机率卡死,这个问题极大的影响了用户的体验。年关将近,这个功能也显得越发重要,客户要求我们限期整改,三天内必须解决该问题。

            我首先对这个功能进行了分析,很明显,这是一个使用频次不高,但是使用时间比较集中的功能。在大量用户同时使用时,会导致后台的厨师,食材,供应商等全面告警(用程序员语言翻译一下,这个功能耗CPU,耗内存,耗IO)。但用户对于实时性的要求并不高。下单之后,订购的菜品是一天内完成,还是两天完成并没有关系,只要年前能做完就可以。

            因此,我们决定采用消息中间件的方式,以队列的形式逐次的执行“年夜饭制作”的操作, 来缓解服务器的各种资源的压力。

            之所以采用Redis来实现消息队列,而不是使用更为成熟的ONS,Kafka。不是因为ONS用不起,而是Redis更有性价比(用户只允许使用ONS中间件,但ONS会带来额外的网络开销,学习成本和风险都更大,这个功能使用频度并不高,没有必要为了它而引入一个重量级的中间件。)

         代码实践

            说干就干,咱们先看看源码,如下:

    1. // 订单实体类
    2. @Data
    3. public class OrderEntity implements Serializable {
    4. /**
    5. * 客户姓名
    6. */
    7. private String customerName;
    8. /**
    9. * 订单号
    10. */
    11. private String orderCode;
    12. /**
    13. * 菜单
    14. */
    15. List menus;
    16. }
    17. @Slf4j
    18. @Service
    19. public class DinnerService {
    20. /**
    21. * 年夜饭下单
    22. *
    23. * @param req 订单信息
    24. * @return
    25. */
    26. public Object orderNewYearEveDinner(OrderEntity entity) {
    27. // 存储订单信息
    28. saveOrder(entity);
    29. // 异步开始做菜
    30. CompletableFuture.runAsync(() -> doNewYearEveDinner(entity));
    31. return "您已成功下单,后厨正在准备预制菜!";
    32. }
    33. /**
    34. * 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒。
    35. * 这个过程中存在多种意外,可能导致该方法执行失败
    36. *
    37. * @param req 订单信息
    38. */
    39. public void doNewYearEveDinner(OrderEntity entity) {
    40. System.out.println("开始做订单 " + entity.getOrderCode() + " 的年夜饭");
    41. try {
    42. Thread.sleep(10000);
    43. }catch (Exception e ) {
    44. e.printStackTrace();
    45. System.out.println("厨子跑了,厨房着火了,供应商堵路上了");
    46. }
    47. System.out.println("订单 " + entity.getOrderCode() + " 的年夜饭已经完成");
    48. }
    49. private void saveOrder(OrderEntity req) {
    50. //这里假设做的是订单入库操作
    51. System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
    52. }
    53. }

            1、引入maven依赖,在application.yml中添加redis配置

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-data-redisartifactId>
    4. dependency>
    1. spring:
    2. redis:
    3. database: 9
    4. host: 127.0.0.1
    5. port: 6379
    6. password:
    7. jedis:
    8. pool:
    9. max-active: 8
    10. max-wait: -1
    11. max-idle: 8
    12. min-idle: 0

            2、添加Redis队列监听,添加Redis配置文件注册监听

    1. // 监听类
    2. @Component
    3. public class DinnerListener implements MessageListener {
    4. @Autowired
    5. private DinnerService service;
    6. @Override
    7. public void onMessage(Message message, byte[] pattern) {
    8. OrderEntity entity= JSON.parseObject(message.toString(), OrderEntity.class);
    9. service.doNewYearEveDinner(entity);
    10. }
    11. }
    12. //配置类,用于注册监听
    13. @Configuration
    14. public class RedisConfig {
    15. @Bean
    16. public ChannelTopic topic() {
    17. return new ChannelTopic("NEW_YEAR_DINNER");
    18. }
    19. @Bean
    20. public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) {
    21. return new MessageListenerAdapter(listener);
    22. }
    23. @Bean
    24. public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
    25. MessageListenerAdapter messageListenerAdapter,
    26. ChannelTopic topic) {
    27. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    28. container.setConnectionFactory(redisConnectionFactory);
    29. container.addMessageListener(messageListenerAdapter, topic);
    30. return container;
    31. }
    32. }

            3、修改原方法,以及Controller调用

    1. // DinnerService中的方法修改
    2. /**
    3. * 年夜饭下单
    4. *
    5. * @param req 订单信息
    6. * @return
    7. */
    8. public Object orderNewYearEveDinner(OrderEntity entity) {
    9. // 存储订单信息
    10. saveOrder(entity);
    11. // 异步开始做菜
    12. redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(entity));
    13. return "您已成功下单,后厨正在准备预制菜!";
    14. }
    15. @RestController
    16. public class DinnerController {
    17. private int i = 0;
    18. @Autowired
    19. private DinnerService service;
    20. @GetMapping("/orderDinner")
    21. public Object orderDinner() {
    22. OrderEntity entity = new OrderEntity();
    23. entity.setOrderCode("Order" + (++i));
    24. entity.setCustomerName("第"+i+"位客户");
    25. return service.orderNewYearEveDinner(entity);
    26. }
    27. }

            4、通过postman调用四次请求,测试结果如下:

            5、Listener中添加同步锁

            细看上文中打出来的注释,我发现这和我设想的不一样啊。原定的计划是先做完第一份年夜饭,再做第二份,做完第二份再做第三份,为什么第一次没执行完就开始执行第二次了?

            在网上查了些资料后我才知道,要达到我想要的效果,得在Listener中添加上同步锁,如下:

    1. @Component
    2. public class DinnerListener implements MessageListener {
    3. @Autowired
    4. private DinnerService service;
    5. private final Object lock = new Object();
    6. @Override
    7. public void onMessage(Message message, byte[] pattern) {
    8. synchronized (lock) {
    9. OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class);
    10. service.doNewYearEveDinner(entity);
    11. }
    12. }
    13. }

            再次执行测试用例,结果如下:

            6、多服务不重复消费消息

            上面的结果已经满足了我们的要求,但是,客户考虑到我们只有一个厨房,的确影响效率,决定给我们扩建一个厨房(添加服务器),希望能达到厨房A做第一份订单,厨房B做第二份订单,以上的代码能实现吗?我们把刚才的项目拷贝一份,修改端口,启动后测试。结果如下:

            从上面的日志可以看出来,两个服务都做了订单1的年夜饭,消息被重复消费了。但是根据业务需求,我们不需要重复消费消息,我们想达到的效果是多服务实现负载均衡,本服务在处理的数据,其他服务不需要再处理了,应该怎么实现呢?咱们依然可以运用Redis,对代码做如下调整:

    1. @Component
    2. public class DinnerListener implements MessageListener {
    3. @Autowired
    4. private DinnerService service;
    5. @Autowired
    6. private RedisTemplate redisTemplate;
    7. private final Object lock = new Object();
    8. @Override
    9. public void onMessage(Message message, byte[] pattern) {
    10. synchronized (lock) {
    11. Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS);
    12. // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
    13. if (!flag) {
    14. return;
    15. }
    16. OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class);
    17. service.doNewYearEveDinner(entity);
    18. }
    19. }
    20. }

            从测试结果来看,这么调整解决达到了我们的效果。

            7、添加日志监控

            仔细检查,发现上面的代码虽然满足了我们的业务需求,但是在安全方面仍然没有得到一定的保障,方法doNewYearEveDinner存在很多不可预见的隐患,如厨师跑了,厨房着了,供应商堵路上了,这些都会导致方法执行失败,那么,我们怎么知道这个订单执行成功或者失败了呢?看日志吗?成百上千条数据堆起来,通过看日志来看结果多不方便啊?咱们是否可以对代码做一下调整?基于这方面考虑,我对代码做了以下调整

    1. //订单类进行调整
    2. @Data
    3. public class OrderEntity implements Serializable {
    4. /**
    5. * 客户姓名
    6. */
    7. private String customerName;
    8. /**
    9. * 订单号
    10. */
    11. private String orderCode;
    12. /**
    13. * 菜单
    14. */
    15. List menus;
    16. /**
    17. * 出餐状态
    18. */
    19. private String dinnerState;
    20. /**
    21. * 做饭开始时间
    22. */
    23. private String dinnerStartTime;
    24. /**
    25. * 做饭结束时间
    26. */
    27. private String dinnerEndTime;
    28. /**
    29. * 备注
    30. */
    31. private String remark;
    32. }
    33. // DinnerService做如下调整, 添加一个订单信息更新的方法
    34. @Slf4j
    35. @Service
    36. public class DinnerService {
    37. @Autowired
    38. private RedisTemplate redisTemplate;
    39. /**
    40. * 年夜饭下单
    41. *
    42. * @param req 订单信息
    43. * @return
    44. */
    45. public Object orderNewYearEveDinner(OrderEntity req) {
    46. // 存储订单信息
    47. saveOrder(req);
    48. // 异步开始做菜
    49. redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(req));
    50. return "您已成功下单,订单号为"+ req.getOrderCode()+",后厨正在准备预制菜!";
    51. }
    52. /**
    53. * 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
    54. *
    55. * @param req 订单信息
    56. */
    57. public void doNewYearEveDinner(OrderEntity req) throws Exception {
    58. System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
    59. Thread.sleep(10000);
    60. System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
    61. }
    62. private void saveOrder(OrderEntity req) {
    63. //这里假设做的是订单入库操作
    64. System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
    65. }
    66. /**
    67. * 根据订单编号修改订单信息
    68. *
    69. * @param orderCode 订单编号
    70. * @param dinnerStatus
    71. * @param remark
    72. */
    73. public void updateOrder(String orderCode, String dinnerStatus, String remark) {
    74. // 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
    75. System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
    76. }
    77. }
    78. // Listener中做如下调整
    79. @Override
    80. public void onMessage(Message message, byte[] pattern) {
    81. synchronized (lock) {
    82. Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS);
    83. // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
    84. if (!flag) {
    85. return;
    86. }
    87. OrderEntity param = JSON.parseObject(message.toString(), OrderEntity.class);
    88. try {
    89. service.doNewYearEveDinner(param);
    90. service.updateOrder(param.getOrderCode(), "SUCCESS", "成功");
    91. }catch (Exception e) {
    92. e.printStackTrace();
    93. service.updateOrder(param.getOrderCode(), "FAIL", e.getMessage());
    94. }
    95. }
    96. }

            这部分代码就不贴测试结果了,与上一次的测试结果一致,只不过提升了功能的可测试性,扩展一下,这个结果能否达到我们的要求呢?其实仍然没有,对于执行失败的订单,我们需要一个机制来处理,根据报错信息决定是重新执行还是直接报警,人为介入处理,由此才能实现整个事务的闭环。

            这是一次简单的SpringBoot+Redis实现队列的实践,个人觉得这个过程比较有趣,分析问题出现的原因,需求的潜在归约,根据业务的需要、当前的条件选择合适的方法和组件,快而有效的解决问题,所以我将它记录了下来,供大家参考。实际上,已经有大神对于Redis实现队列的方法进行了完整细致的归纳,如果想深入的了解这部分的知识,推荐你们看看这篇博客: Redis队列详解(springboot实战)

  • 相关阅读:
    dart 学习 之 Getters and setters
    DNS(域名解析系统)工作过程
    ChatGPT如何协助人们学习新的科学和技术概念?
    弹性伸缩:高可用架构利器(架构+算法+思维)
    解析隐式类型转换操作operator double() const,带你了解隐式转换危害有多大
    三、CSS中级
    MSTP理论讲解
    C语言——通讯录管理系统
    vue2+element UI 树形只有两级, 第一级只能上下移动,第二级不能成为第一级,只能拖到别的第一级和在同一级排序
    SpringBoot接口 - 如何优雅的对参数进行校验?
  • 原文地址:https://blog.csdn.net/m0_60681411/article/details/135895842